You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/10/26 21:11:09 UTC

svn commit: r829931 - in /qpid/trunk/qpid/cpp: include/qpid/client/ src/ src/qpid/client/ src/qpid/client/amqp0_10/ src/tests/

Author: aconway
Date: Mon Oct 26 20:11:08 2009
New Revision: 829931

URL: http://svn.apache.org/viewvc?rev=829931&view=rev
Log:
Separate FailoverListener from client::Connection.

client::ConnectionImpl used to contain a FailoverListener to subscribe
for updates on the amq.failover exchange. This caused some lifecycle
issues including memory leaks.

Now FailoverListener is a public API class that the user must create
associated with a session to get known-broker updates.

Removed the weak_ptr logic in client::SessionImpl which was only
required because of FailoverListener.

Made SessionImpl::close() idempotent. Gets rid of spurious warning
messages in some tests.

Added:
    qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h
      - copied, changed from r829912, qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
Modified:
    qpid/trunk/qpid/cpp/include/qpid/client/Connection.h
    qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/exception_test.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/Connection.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/client/Connection.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/Connection.h Mon Oct 26 20:11:08 2009
@@ -200,7 +200,11 @@
 
     QPID_CLIENT_EXTERN bool isOpen() const;
 
-    QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers();
+    /** In a cluster, returns the initial set of known broker URLs
+     * at the time of connection.
+     */
+    QPID_CLIENT_EXTERN std::vector<Url> getInitialBrokers();
+
     QPID_CLIENT_EXTERN void registerFailureCallback ( boost::function<void ()> fn );
 
     /**

Copied: qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h (from r829912, qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h?p2=qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h&p1=qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h&r1=829912&r2=829931&rev=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/FailoverListener.h Mon Oct 26 20:11:08 2009
@@ -22,7 +22,11 @@
  *
  */
 
+#include "qpid/client/ClientImportExport.h"
 #include "qpid/client/MessageListener.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
@@ -32,26 +36,43 @@
 namespace qpid {
 namespace client {
 
-class SubscriptionManager;
-class ConnectionImpl;
 
 /**
- * @internal Listen for failover updates from the amq.failover exchange.
+ * Listen for updates from the amq.failover exchange.
+ *
+ * In a cluster, the amq.failover exchange provides updates whenever
+ * the cluster membership changes. This class subscribes to the
+ * failover exchange and providees the latest list of known brokers.
+ *
+ * You can also subscribe to amq.failover yourself and use
+ * FailoverListener::decode to extract a list of broker URLs from a
+ * failover exchange message.
  */
-class FailoverListener : public MessageListener, private qpid::sys::Runnable 
+class FailoverListener : private MessageListener, private qpid::sys::Runnable 
 {
   public:
-    FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls);
-    ~FailoverListener();
-    void stop();
+    /** The name of the standard failover exchange amq.failover */
+    static QPID_CLIENT_EXTERN const std::string AMQ_FAILOVER;
 
-    std::vector<Url> getKnownBrokers() const;
-    void received(Message& msg);
-    void run();
+    /** Extract the broker list from a failover exchange message */
+    static QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers(const Message& m);
+    
+    /** Subscribe to amq.failover exchange. */
+    QPID_CLIENT_EXTERN FailoverListener(Connection);
+
+    QPID_CLIENT_EXTERN ~FailoverListener();
+
+    /** Returns the latest list of known broker URLs. */
+    QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers() const;
     
   private:
+    void received(Message& msg);
+    void run();
+
     mutable sys::Mutex lock;
-    std::auto_ptr<SubscriptionManager> subscriptions;
+    Connection connection;
+    Session session;
+    SubscriptionManager subscriptions;
     sys::Thread thread;
     std::vector<Url> knownBrokers;
 };

Modified: qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/client/FailoverManager.h Mon Oct 26 20:11:08 2009
@@ -22,12 +22,13 @@
  *
  */
 
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionSettings.h"
 #include "qpid/Exception.h"
 #include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Monitor.h"
 #include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/sys/Monitor.h"
 #include <vector>
 
 namespace qpid {
@@ -123,6 +124,7 @@
 
     qpid::sys::Monitor lock;
     Connection connection;
+    std::auto_ptr<FailoverListener> failoverListener;
     ConnectionSettings settings;
     ReconnectionStrategy* strategy;
     State state;

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 26 20:11:08 2009
@@ -652,7 +652,6 @@
   qpid/client/Dispatcher.h			\
   qpid/client/Execution.h			\
   qpid/client/FailoverListener.cpp		\
-  qpid/client/FailoverListener.h		\
   qpid/client/FailoverManager.cpp		\
   qpid/client/Future.cpp			\
   qpid/client/FutureCompletion.cpp		\
@@ -744,6 +743,7 @@
   ../include/qpid/client/Completion.h		\
   ../include/qpid/client/Connection.h		\
   ../include/qpid/client/ConnectionSettings.h	\
+  ../include/qpid/client/FailoverListener.h	\
   ../include/qpid/client/FailoverManager.h	\
   ../include/qpid/client/FlowControl.h		\
   ../include/qpid/client/Future.h		\

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Mon Oct 26 20:11:08 2009
@@ -152,8 +152,8 @@
         impl->close();
 }
 
-std::vector<Url> Connection::getKnownBrokers() {
-    return impl ? impl->getKnownBrokers() : std::vector<Url>();
+std::vector<Url> Connection::getInitialBrokers() {
+    return impl ? impl->getInitialBrokers() : std::vector<Url>();
 }
 
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -22,7 +22,6 @@
 #include "qpid/client/Connector.h"
 #include "qpid/client/ConnectionSettings.h"
 #include "qpid/client/SessionImpl.h"
-#include "qpid/client/FailoverListener.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
@@ -88,7 +87,6 @@
     // Important to close the connector first, to ensure the
     // connector thread does not call on us while the destructor
     // is running.
-    failover.reset();
     if (connector) connector->close();
 }
 
@@ -175,7 +173,6 @@
     } else {
         QPID_LOG(debug, "No security layer in place");
     }
-    failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
 }
 
 void ConnectionImpl::idleIn()
@@ -256,8 +253,8 @@
     return handler;
 }
 
-std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
-    return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
+std::vector<qpid::Url> ConnectionImpl::getInitialBrokers() {
+    return handler.knownBrokersUrls;
 }
 
 boost::shared_ptr<SessionImpl>  ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
@@ -267,6 +264,4 @@
     return simpl;
 }
 
-void ConnectionImpl::stopFailoverListener() { failover->stop(); }
-
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Mon Oct 26 20:11:08 2009
@@ -42,7 +42,6 @@
 class Connector;
 struct ConnectionSettings;
 class SessionImpl;
-class FailoverListener;
 
 class ConnectionImpl : public Bounds,
                        public framing::FrameHandler,
@@ -58,7 +57,6 @@
     SessionMap sessions; 
     ConnectionHandler handler;
     boost::scoped_ptr<Connector> connector;
-    boost::scoped_ptr<FailoverListener> failover;
     framing::ProtocolVersion version;
     uint16_t nextChannel;
     sys::Mutex lock;
@@ -90,9 +88,8 @@
     void erase(uint16_t channel);
     const ConnectionSettings& getNegotiatedSettings();
 
-    std::vector<Url> getKnownBrokers();
+    std::vector<Url> getInitialBrokers();
     void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
-    void stopFailoverListener();
 
     framing::ProtocolVersion getVersion() { return version; }
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Mon Oct 26 20:11:08 2009
@@ -19,11 +19,7 @@
  *
  */
 #include "qpid/client/FailoverListener.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/SubscriptionImpl.h"
-#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Session.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include "qpid/log/Helpers.h"
@@ -31,83 +27,46 @@
 namespace qpid {
 namespace client {
 
-static const std::string AMQ_FAILOVER("amq.failover");
+const std::string FailoverListener::AMQ_FAILOVER("amq.failover");
 
-static Session makeSession(boost::shared_ptr<SessionImpl> si) {
-    // Hold only a weak pointer to the ConnectionImpl so a
-    // FailoverListener in a ConnectionImpl won't createa a shared_ptr
-    // cycle.
-    // 
-    si->setWeakPtr(true);
-    Session s;
-    SessionBase_0_10Access(s).set(si);
-    return s;
-}
-
-FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls)
-  : knownBrokers(initUrls) 
- {
-    // Special versions used to mark cluster catch-up connections
-    // which do not need a FailoverListener
-    if (c->getVersion().getMajor() >= 0x80)  {
-        QPID_LOG(debug, "No failover listener for catch-up connection.");
-        return;
-    }
-
-    Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
+FailoverListener::FailoverListener(Connection c) :
+    connection(c),
+    session(c.newSession(AMQ_FAILOVER+"."+framing::Uuid(true).str())),
+    subscriptions(session)
+{
+    knownBrokers = c.getInitialBrokers();
     if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
         session.close();
         return;
     }
-    subscriptions.reset(new SubscriptionManager(session));
     std::string qname=session.getId().getName();
     session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
     session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
-    subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
+    subscriptions.subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(),
+                                                                ACCEPT_MODE_NONE));
     thread = sys::Thread(*this);
 }
 
-void FailoverListener::run() 
-{
+void FailoverListener::run() {
     try {
-        subscriptions->run();
-    } catch (const TransportFailure&) {
-    } catch (const std::exception& e) {
-        QPID_LOG(error, QPID_MSG(e.what()));
-    }
+        subscriptions.run();
+    } catch(...) {}
 }
 
 FailoverListener::~FailoverListener() {
-    try { stop(); }
-    catch (const std::exception& /*e*/) {}
-}
-
-void FailoverListener::stop() {
-    if (subscriptions.get()) 
-        subscriptions->stop();
-
-    if (thread.id() == sys::Thread::current().id()) {
-        // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
-        // dtor runs when my session drops its weak pointer lock.
-        // For now, leak subscriptions to prevent a core if we delete
-        // without joining.
-        subscriptions.release();
-    }
-    else if (thread.id()) {
+    try {
+        subscriptions.stop();
         thread.join();
-        thread=sys::Thread();
-        subscriptions.reset();  // Safe to delete after join.
-    }
+        if (connection.isOpen()) {
+            session.sync();
+            session.close();
+        }
+    } catch (...) {}
 }
 
 void FailoverListener::received(Message& msg) {
     sys::Mutex::ScopedLock l(lock);
-    knownBrokers.clear();
-    framing::Array urlArray;
-    msg.getHeaders().getArray("amq.failover", urlArray);
-    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i ) 
-        knownBrokers.push_back(Url((*i)->get<std::string>()));
-    QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
+    knownBrokers = getKnownBrokers(msg);
 }
 
 std::vector<Url> FailoverListener::getKnownBrokers() const {
@@ -115,4 +74,16 @@
     return knownBrokers;
 }
 
+std::vector<Url> FailoverListener::getKnownBrokers(const Message& msg) {
+    std::vector<Url> knownBrokers;
+    framing::Array urlArray;
+    msg.getHeaders().getArray("amq.failover", urlArray);
+    for (framing::Array::ValueVector::const_iterator i = urlArray.begin();
+         i != urlArray.end();
+         ++i ) 
+        knownBrokers.push_back(Url((*i)->get<std::string>()));
+    return knownBrokers;
+}
+
+
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp Mon Oct 26 20:11:08 2009
@@ -77,7 +77,9 @@
         } else {
             state = CONNECTING;
             Connection c;
-            attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+            if (brokers.empty() && failoverListener.get())
+                brokers = failoverListener->getKnownBrokers();
+            attempt(c, settings, brokers);
             if (c.isOpen()) state = IDLE;
             else state = CANT_CONNECT;
             connection = c;
@@ -118,6 +120,7 @@
     try {
         QPID_LOG(info, "Attempting to connect to " << s.host << " on " << s.port << "..."); 
         c.open(s);
+        failoverListener.reset(new FailoverListener(c));
         QPID_LOG(info, "Connected to " << s.host << " on " << s.port); 
     } catch (const Exception& e) {
         QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what()); 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -57,9 +57,7 @@
       detachedLifetime(0),
       maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
       id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
-      connectionShared(conn),
-      connectionWeak(conn),
-      weakPtr(false),
+      connection(conn),
       ioHandler(*this),
       proxy(ioHandler),
       nextIn(0),
@@ -68,7 +66,7 @@
       doClearDeliveryPropertiesExchange(true),
       autoDetach(true)
 {
-    channel.next = connectionShared.get();
+    channel.next = connection.get();
 }
 
 SessionImpl::~SessionImpl() {
@@ -87,8 +85,7 @@
         }
         delete sendMsgCredit;
     }
-    boost::shared_ptr<ConnectionImpl> c =  connectionWeak.lock();
-    if (c) c->erase(channel);
+    connection->erase(channel);
 }
 
 
@@ -122,6 +119,7 @@
 void SessionImpl::close() //user thread
 {
     Lock l(state);
+    if (state == DETACHED || state == DETACHING) return;
     if (detachedLifetime) setTimeout(0);
     detach();
     waitFor(DETACHED);
@@ -129,8 +127,6 @@
 
 void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread
 {
-    // weakPtr sessions should not be resumed.
-    if (weakPtr) return;
     throw NotImplementedException("Resume not yet implemented by client!");
 }
 
@@ -509,11 +505,8 @@
 
 void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock)
 {
-    boost::shared_ptr<ConnectionImpl> c =  connectionWeak.lock();
-    if (c) {
-        channel.handle(frame);
-        c->expand(frame.encodedSize(), canBlock);
-    }
+    channel.handle(frame);
+    connection->expand(frame.encodedSize(), canBlock);
 }
 
 void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -809,17 +802,9 @@
     return detachedLifetime;
 }
 
-void SessionImpl::setWeakPtr(bool weak) {
-    weakPtr = weak;
-    if (weakPtr)
-        connectionShared.reset();   // Only keep weak pointer
-    else
-        connectionShared = connectionWeak.lock();
-}
-
 boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection()
 {
-    return connectionWeak.lock();
+    return connection;
 }
 
 void SessionImpl::disableAutoDetach() { autoDetach = false; }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Oct 26 20:11:08 2009
@@ -120,11 +120,6 @@
     /** Get timeout in seconds. */
     uint32_t getTimeout() const;
 
-    /** Make this session use a weak_ptr to the ConnectionImpl.
-     * Used for sessions created by the ConnectionImpl itself.
-     */
-    void setWeakPtr(bool weak=true);
-
     /** 
      * get the Connection associated with this connection
      */
@@ -224,9 +219,7 @@
     const uint64_t maxFrameSize;
     const SessionId id;
 
-    boost::shared_ptr<ConnectionImpl> connectionShared;
-    boost::weak_ptr<ConnectionImpl> connectionWeak;
-    bool weakPtr;
+    boost::shared_ptr<ConnectionImpl> connection;
 
     framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
     framing::ChannelHandler channel;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Oct 26 20:11:08 2009
@@ -136,7 +136,9 @@
 
 bool ConnectionImpl::tryConnect()
 {
-    if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+    if (tryConnect(url) ||
+        (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
+    {
         return resetSessions();
     } else {
         return false;
@@ -148,6 +150,7 @@
     try {
         QPID_LOG(info, "Trying to connect to " << url << "...");                
         connection.open(u, settings);
+        failoverListener.reset(new FailoverListener(connection));
         return true;
     } catch (const Exception& e) {
         //TODO: need to fix timeout on open so that it throws TransportFailure

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Mon Oct 26 20:11:08 2009
@@ -25,6 +25,7 @@
 #include "qpid/messaging/Variant.h"
 #include "qpid/Url.h"
 #include "qpid/client/Connection.h"
+#include "qpid/client/FailoverListener.h"
 #include "qpid/client/ConnectionSettings.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Semaphore.h"
@@ -50,6 +51,7 @@
     qpid::sys::Mutex lock;//used to protect data structures
     qpid::sys::Semaphore semaphore;//used to coordinate reconnection
     qpid::client::Connection connection;
+    std::auto_ptr<FailoverListener> failoverListener;
     qpid::Url url;
     qpid::client::ConnectionSettings settings;
     Sessions sessions;

Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Mon Oct 26 20:11:08 2009
@@ -126,8 +126,8 @@
     ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
         : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
 
-    ~ClientT() { connection.close(); }
-    void close() { session.close(); connection.close(); }
+    ~ClientT() { close(); }
+    void close() { if (connection.isOpen()) { session.close(); connection.close(); } }
 };
 
 typedef ClientT<> Client;

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Mon Oct 26 20:11:08 2009
@@ -141,13 +141,14 @@
  * Get the known broker ports from a Connection.
  *@param n if specified wait for the cluster size to be n, up to a timeout.
  */
-std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
-    std::vector<qpid::Url> urls = source.getKnownBrokers();
+std::set<int> knownBrokerPorts(qpid::client::Connection& c, int n) {
+    FailoverListener fl(c);
+    std::vector<qpid::Url> urls = fl.getKnownBrokers();
     if (n >= 0 && unsigned(n) != urls.size()) {
         // Retry up to 10 secs in .1 second intervals.
         for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
             qpid::sys::usleep(1000*100); // 0.1 secs
-            urls = source.getKnownBrokers();
+            urls = fl.getKnownBrokers();
         }
     }
     std::set<int> s;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 26 20:11:08 2009
@@ -358,13 +358,15 @@
     rollbackSession.txRollback();
     rollbackSession.messageRelease(rollbackMessage.getId());
 
-
     // Verify queue status: just the comitted messages and dequeues should remain.
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+    commitSession.close();
+    rollbackSession.close();
 }
 
 QPID_AUTO_TEST_CASE(testUnacked) {
@@ -859,9 +861,12 @@
     Receiver receiver(fmgr, "my-queue", "my-data");
     qpid::sys::Thread runner(receiver);
     receiver.waitForReady();
-    cluster.kill(1);
-    //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
-    ::usleep(2*1000*1000);
+    {
+        ScopedSuppressLogging allQuiet; // suppress connection closed messages
+        cluster.kill(1);
+        //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+        ::usleep(2*1000*1000);
+    }
     fmgr.execute(sender);
     runner.join();
     BOOST_CHECK(!receiver.failed);

Modified: qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=829931&r1=829930&r2=829931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Oct 26 20:11:08 2009
@@ -112,8 +112,8 @@
 
     Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
     fix.connection.proxy.close();
-    runner.join();
-    BOOST_CHECK_THROW(fix.session.close(), TransportFailure);
+    runner.join();    
+    BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
 }
 
 QPID_AUTO_TEST_CASE(NoSuchQueueTest) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org