You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/09 21:36:52 UTC

svn commit: r703237 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/client/ qpid/cluster/ tests/

Author: aconway
Date: Thu Oct  9 12:36:51 2008
New Revision: 703237

URL: http://svn.apache.org/viewvc?rev=703237&view=rev
Log:
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Oct  9 12:36:51 2008
@@ -390,6 +390,7 @@
   qpid/client/SessionBase_0_10.cpp		\
   qpid/client/SessionBase_0_10.h		\
   qpid/client/SessionBase_0_10Access.h		\
+  qpid/client/ConnectionAccess.h		\
   qpid/client/SessionImpl.cpp			\
   qpid/client/StateManager.cpp			\
   qpid/client/SubscriptionManager.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu Oct  9 12:36:51 2008
@@ -147,7 +147,7 @@
 
 void SessionState::senderConfirmed(const SessionPoint& confirmed) {
     if (confirmed > sender.sendPoint)
-        throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent."));
+        throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed < " << confirmed << " but only sent < " << sender.sendPoint));
     QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
     ReplayList::iterator i = sender.replayList.begin();
     while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
@@ -169,7 +169,7 @@
     QPID_LOG(debug, getId() << ": sender marked completed: " << commands);
     sender.incomplete -= commands;
     // Completion implies confirmation but we don't handle out-of-order
-    // confirmation, so confirm only the first contiguous range of commands.
+    // confirmation, so confirm up to the end of the first contiguous range of commands.
     senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Thu Oct  9 12:36:51 2008
@@ -44,7 +44,7 @@
 namespace qpid {
 namespace client {
 
-Connection::Connection() : channelIdCounter(0), version(framing::highestProtocolVersion) {}
+Connection::Connection() : version(framing::highestProtocolVersion) {}
 
 Connection::~Connection(){ }
 
@@ -106,26 +106,19 @@
 
     impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
     impl->open();
-    max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
 }
 
-Session Connection::newSession(const std::string& name) {
+Session Connection::newSession(const std::string& name, uint32_t timeout) {
     if (!isOpen())
         throw Exception(QPID_MSG("Connection has not yet been opened"));
-    shared_ptr<SessionImpl> simpl(
-        new SessionImpl(name, impl, ++channelIdCounter, max_frame_size));
-    impl->addSession(simpl);
-    simpl->open(0);
     Session s;
-    SessionBase_0_10Access(s).set(simpl);
+    SessionBase_0_10Access(s).set(impl->newSession(name, timeout));
     return s;
 }
 
 void Connection::resume(Session& session) {
     if (!isOpen())
         throw Exception(QPID_MSG("Connection is not open."));
-
-    session.impl->setChannel(++channelIdCounter);
     impl->addSession(session.impl);
     session.impl->resume(impl);
 }
@@ -134,4 +127,8 @@
     impl->close();
 }
 
+std::vector<Url> Connection::getKnownBrokers() {
+    return isOpen() ? impl->getKnownBrokers() : std::vector<Url>();
+}
+
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Oct  9 12:36:51 2008
@@ -32,6 +32,7 @@
 namespace client {
 
 class ConnectionSettings;
+class ConnectionImpl;
 
 /**
  * Represents a connection to an AMQP broker. All communication is
@@ -42,9 +43,7 @@
  */
 class Connection
 {
-    framing::ChannelId channelIdCounter;
     framing::ProtocolVersion version;
-    uint16_t max_frame_size;
 
   protected:
     boost::shared_ptr<ConnectionImpl> impl;
@@ -55,6 +54,7 @@
      * @see open()
      */
     Connection();
+
     ~Connection();
 
     /**
@@ -157,7 +157,7 @@
      * If the name is empty (the default) then a unique name will be
      * chosen using a Universally-unique identifier (UUID) algorithm.
      */
-    Session newSession(const std::string& name=std::string());
+    Session newSession(const std::string& name=std::string(), uint32_t timeoutSeconds = 0);
 
     /**
      * Resume a suspended session. A session may be resumed
@@ -167,7 +167,8 @@
 
     bool isOpen() const;
 
-    
+    std::vector<Url> getKnownBrokers();
+
   friend class ConnectionAccess; ///<@internal
   friend class SessionBase_0_10; ///<@internal
 };

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h?rev=703237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h Thu Oct  9 12:36:51 2008
@@ -0,0 +1,41 @@
+#ifndef QPID_CLIENT_CONNECTIONACCESS_H
+#define QPID_CLIENT_CONNECTIONACCESS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+
+/**@file @internal  Internal use only */
+
+namespace qpid {
+namespace client {
+
+
+
+struct ConnectionAccess {
+    static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
+    static boost::shared_ptr<ConnectionImpl>  getImpl(Connection& c) { return c.impl; }
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_CONNECTIONACCESS_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionAccess.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Oct  9 12:36:51 2008
@@ -22,8 +22,10 @@
 #include "Connector.h"
 #include "ConnectionSettings.h"
 #include "SessionImpl.h"
+#include "FailoverListener.h"
 
 #include "qpid/log/Statement.h"
+#include "qpid/Url.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 
@@ -40,7 +42,9 @@
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
-      version(v)
+      failover(new FailoverListener()),
+      version(v),
+      nextChannel(1)
 {
     QPID_LOG(debug, "ConnectionImpl created for " << version);
     handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
@@ -56,12 +60,14 @@
     // Important to close the connector first, to ensure the
     // connector thread does not call on us while the destructor
     // is running.
-    if (connector) connector->close(); 
+    failover.reset();
+    if (connector) connector->close();
 }
 
-void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
 {
     Mutex::ScopedLock l(lock);
+    session->setChannel(channel ? channel : nextChannel++);
     boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
     if (s.lock()) throw SessionBusyException();
     s = session;
@@ -102,7 +108,9 @@
     connector->setShutdownHandler(this);
     connector->connect(host, port);
     connector->init();
-    handler.waitForOpen();    
+    handler.waitForOpen();
+
+    if (failover.get()) failover->start(shared_from_this());
 }
 
 void ConnectionImpl::idleIn()
@@ -162,3 +170,16 @@
     return handler;
 }
     
+std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
+    // FIXME aconway 2008-10-08: initialize failover list from openOk or settings
+    return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>();
+}
+
+boost::shared_ptr<SessionImpl>  ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
+    boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this()));
+    addSession(simpl, channel);
+    simpl->open(timeout);
+    return simpl;
+}
+
+void ConnectionImpl::stopFailoverListener() { failover.reset(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Oct  9 12:36:51 2008
@@ -24,6 +24,7 @@
 
 #include "Bounds.h"
 #include "ConnectionHandler.h"
+
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/ShutdownHandler.h"
@@ -41,6 +42,7 @@
 class Connector;
 class ConnectionSettings;
 class SessionImpl;
+class FailoverListener;
 
 class ConnectionImpl : public Bounds,
                        public framing::FrameHandler,
@@ -54,7 +56,9 @@
     SessionMap sessions; 
     ConnectionHandler handler;
     boost::scoped_ptr<Connector> connector;
+    boost::scoped_ptr<FailoverListener> failover;
     framing::ProtocolVersion version;
+    uint16_t nextChannel;
     sys::Mutex lock;
 
     template <class F> void closeInternal(const F&);
@@ -72,13 +76,18 @@
     void open();
     bool isOpen() const;
 
-    void addSession(const boost::shared_ptr<SessionImpl>&);
+    boost::shared_ptr<SessionImpl> newSession(const std::string& name, uint32_t timeout, uint16_t channel=0);
+    void addSession(const boost::shared_ptr<SessionImpl>&, uint16_t channel=0);
         
     void close();
     void handle(framing::AMQFrame& frame);
     void erase(uint16_t channel);
-
+    void stopFailoverListener();
+    
     const ConnectionSettings& getNegotiatedSettings();
+
+    std::vector<Url> getKnownBrokers();
+
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Thu Oct  9 12:36:51 2008
@@ -19,27 +19,50 @@
  *
  */
 #include "FailoverListener.h"
+#include "SessionBase_0_10Access.h"
+#include "qpid/client/SubscriptionManager.h"
 
 namespace qpid {
 namespace client {
 
 static const std::string AMQ_FAILOVER("amq.failover");
 
-FailoverListener::FailoverListener(Connection c)
-    : connection(c), session(c.newSession()), subscriptions(session)
-{
+static Session makeSession(boost::shared_ptr<SessionImpl> si) {
+    // Hold only a weak pointer to the ConnectionImpl so a
+    // FailoverListener in a ConnectionImpl won't createa a shared_ptr
+    // cycle.
+    // 
+    si->setWeakPtr(true);
+    Session s;
+    SessionBase_0_10Access(s).set(si);
+    return s;
+}
+
+FailoverListener::FailoverListener() {}
+
+void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) {
+    Session session = makeSession(c->newSession(std::string(), 0));
+    if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
+        session.close();
+        return;
+    }
+    subscriptions.reset(new SubscriptionManager(session));
     std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
-    if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty())
-        return;                 // Failover exchange not implemented.
     session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
     session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
-    subscriptions.subscribe(*this, qname, FlowControl::unlimited());
-    thread = sys::Thread(subscriptions);
+    subscriptions->subscribe(*this, qname, FlowControl::unlimited());
+    thread = sys::Thread(*subscriptions);
 }
 
-FailoverListener::~FailoverListener() {
-    subscriptions.stop();
+void FailoverListener::stop() {
+    if (subscriptions.get()) subscriptions->stop();
     if (thread.id()) thread.join();
+    if (subscriptions.get()) subscriptions->getSession().close();
+    thread=sys::Thread();
+    subscriptions.reset();
+}    
+FailoverListener::~FailoverListener() {
+    stop();
 }
 
 void FailoverListener::received(Message& msg) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Thu Oct  9 12:36:51 2008
@@ -22,10 +22,7 @@
  *
  */
 
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
 #include "qpid/client/MessageListener.h"
-#include "qpid/client/SubscriptionManager.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Thread.h"
@@ -34,22 +31,24 @@
 namespace qpid {
 namespace client {
 
+class SubscriptionManager;
+
 /**
  * @internal Listen for failover updates from the amq.failover exchange.
  */
-class FailoverListener : public MessageListener
-{
+class FailoverListener : public MessageListener {
   public:
-    FailoverListener(Connection);
+    FailoverListener();
     ~FailoverListener();
+    void start(const boost::shared_ptr<ConnectionImpl>&);
+    void stop();
+    
     std::vector<Url> getKnownBrokers() const;
     void received(Message& msg);
     
   private:
     mutable sys::Mutex lock;
-    Connection connection;
-    Session session;
-    SubscriptionManager subscriptions;
+    std::auto_ptr<SubscriptionManager> subscriptions;
     sys::Thread thread;
     std::vector<Url> knowBrokers;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Thu Oct  9 12:36:51 2008
@@ -51,21 +51,19 @@
 typedef sys::ScopedLock<sys::Semaphore>  Acquire;
 
 
-SessionImpl::SessionImpl(const std::string& name,
-                         shared_ptr<ConnectionImpl> conn,
-                         uint16_t ch, uint64_t _maxFrameSize)
+SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> conn)
     : state(INACTIVE),
       detachedLifetime(0),
-      maxFrameSize(_maxFrameSize),
+      maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
       id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
-      connection(conn),
-      ioHandler(*this),
-      channel(ch),
-      proxy(ioHandler),
+      connectionShared(conn),
+      connectionWeak(conn),
+      weakPtr(false),
+      proxy(out),
       nextIn(0),
       nextOut(0)
 {
-    channel.next = connection.get();
+    channel.next = connectionShared.get();
 }
 
 SessionImpl::~SessionImpl() {
@@ -78,7 +76,8 @@
             state.waitWaiters();
         }
     }
-    connection->erase(channel);
+    boost::shared_ptr<ConnectionImpl> c =  connectionWeak.lock();
+    if (c) c->erase(channel);
 }
 
 
@@ -119,6 +118,8 @@
 
 void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread
 {
+    // weakPtr sessions should not be resumed.
+    if (weakPtr) return;
     throw NotImplementedException("Resume not yet implemented by client!");
 }
 
@@ -251,7 +252,6 @@
  */
 void SessionImpl::connectionClosed(uint16_t code, const std::string& text)  {
     setException(createConnectionException(code, text));
-    // FIXME aconway 2008-10-07: Should closing a connection detach or close its sessions?
     handleClosed();
 }
 
@@ -259,9 +259,7 @@
  * Called by ConnectionImpl to notify active sessions when connection
  * is disconnected
  */
-void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) 
-{
-    // FIXME aconway 2008-10-07: distinguish disconnect from clean close.  
+void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) {
     connectionClosed(_code, _text);
 }
 
@@ -426,14 +424,11 @@
 
 void SessionImpl::handleOut(AMQFrame& frame) // user thread
 {
-    connection->expand(frame.encodedSize(), true);
-    channel.handle(frame);
-}
-
-void SessionImpl::proxyOut(AMQFrame& frame) // network thread
-{
-    connection->expand(frame.encodedSize(), false);
-    channel.handle(frame);
+    boost::shared_ptr<ConnectionImpl> c =  connectionWeak.lock();
+    if (c) {
+        c->expand(frame.encodedSize(), true);
+        channel.handle(frame);
+    }
 }
 
 void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -602,11 +597,11 @@
                             const std::string& description,
                             const framing::FieldTable& /*errorInfo*/)
 {
-    QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description 
-             << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
-
     Lock l(state);
     setExceptionLH(createSessionException(errorCode, description));
+    QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() 
+             << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
+
     if (detachedLifetime) 
         setTimeout(0);
 }
@@ -648,8 +643,6 @@
 
 void SessionImpl::handleClosed()
 {
-    // FIXME aconway 2008-06-12: needs to be set to the correct exception type.
-    // 
     demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder);
     results.close();
 }
@@ -662,4 +655,16 @@
     return detachedLifetime;
 }
 
+uint32_t SessionImpl::getTimeout() const {
+    return detachedLifetime;
+}
+
+void SessionImpl::setWeakPtr(bool weak) {
+    weakPtr = weak;
+    if (weakPtr)
+        connectionShared.reset();   // Only keep weak pointer
+    else
+        connectionShared = connectionWeak.lock();
+}
+
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Thu Oct  9 12:36:51 2008
@@ -28,7 +28,8 @@
 
 #include "qpid/SessionId.h"
 #include "qpid/SessionState.h"
-#include "qpid/shared_ptr.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/weak_ptr.hpp"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/ChannelHandler.h"
 #include "qpid/framing/SequenceNumber.h"
@@ -63,7 +64,7 @@
                     private framing::AMQP_ClientOperations::ExecutionHandler
 {
 public:
-    SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+    SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
     ~SessionImpl();
 
 
@@ -106,6 +107,11 @@
     /** Get timeout in seconds. */
     uint32_t getTimeout() const;
 
+    /** Make this session use a weak_ptr to the ConnectionImpl.
+     * Used for sessions created by the ConnectionImpl itself.
+     */
+    void setWeakPtr(bool weak=true);
+
 private:
     enum State {
         INACTIVE,
@@ -131,7 +137,6 @@
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
-    void proxyOut(framing::AMQFrame& frame);
     void deliver(framing::AMQFrame& frame);
 
     Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
@@ -175,8 +180,11 @@
     const uint64_t maxFrameSize;
     const SessionId id;
 
-    shared_ptr<ConnectionImpl> connection;
-    framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
+    shared_ptr<ConnectionImpl> connection();
+    shared_ptr<ConnectionImpl> connectionShared;
+    boost::weak_ptr<ConnectionImpl> connectionWeak;
+    bool weakPtr;
+
     framing::ChannelHandler channel;
     framing::AMQP_ServerProxy::Session proxy;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Thu Oct  9 12:36:51 2008
@@ -198,10 +198,10 @@
      * Default is to acknowledge every message automatically.
      */
     void setAckPolicy(const AckPolicy& autoAck);
-    /**
-     *
-     */
+
      AckPolicy& getAckPolicy();
+
+    Session getSession() const { return session; }
 };
 
 /** AutoCancel cancels a subscription in its destructor */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Oct  9 12:36:51 2008
@@ -123,6 +123,8 @@
 
 void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
     Lock l(lock);
+    // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map?
+    // esp shadow connections? See race comment in getConnection.
     assert(!c->isCatchUp());
     connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
 }
@@ -204,15 +206,18 @@
 }
 
 boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&)  {
-    if (connectionId.getMember() == memberId)
-        return boost::intrusive_ptr<Connection>(connectionId.getPointer());
     ConnectionMap::iterator i = connections.find(connectionId);
-    if (i == connections.end()) { // New shadow connection.
-        assert(connectionId.getMember() != memberId);
-        std::ostringstream mgmtId;
-        mgmtId << name.str() << ":"  << connectionId;
-        ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
-        i = connections.insert(value).first;
+    if (i == connections.end()) { 
+        if (connectionId.getMember() == memberId) { // Closed local connection
+            QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId);
+            return boost::intrusive_ptr<Connection>();
+        }
+        else {                  // New shadow connection
+            std::ostringstream mgmtId;
+            mgmtId << name.str() << ":"  << connectionId;
+            ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
+            i = connections.insert(value).first;
+        }
     }
     return i->second;
 }
@@ -261,15 +266,17 @@
             }
         }
         else {                      // e.isConnection()
-            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); 
-            if (e.getType() == DATA) {
-                QPID_LOG(trace, *this << " PROC: " << e);
-                connection->deliverBuffer(buf);
-            }
-            else {              // control
-                while (frame.decode(buf)) {
-                    QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
-                    connection->delivered(frame);
+            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+            if (connection) {   // Ignore if no connection.
+                if (e.getType() == DATA) {
+                    QPID_LOG(trace, *this << " PROC: " << e);
+                    connection->deliverBuffer(buf);
+                }
+                else {              // control
+                    while (frame.decode(buf)) {
+                        QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+                        connection->delivered(frame);
+                    }
                 }
             }
         }
@@ -333,7 +340,7 @@
     Mutex::ScopedLock l(lock);
     QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) 
              << AddrList(left, nLeft, "( ", ")"));
-    bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+    map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
     if (state == LEFT) return;
     if (!map.isAlive(memberId)) { leave(l); return; } 
     
@@ -350,7 +357,7 @@
             QPID_LOG(debug, *this << " send dump-request " << myUrl);
         }
     }
-    else if (state >= READY && changed) 
+    else if (state >= READY)
         memberUpdate(l);
 }
 
@@ -408,8 +415,8 @@
 }
 
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
-    if (map.ready(id, Url(url)))
-        memberUpdate(l);
+    map.ready(id, Url(url));
+    memberUpdate(l);
 }
 
 void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct  9 12:36:51 2008
@@ -23,6 +23,7 @@
 #include "ClusterMap.h"
 #include "Connection.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
+#include "qpid/client/ConnectionAccess.h" 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
@@ -41,14 +42,6 @@
 #include <boost/bind.hpp>
 
 namespace qpid {
-
-namespace client {
-struct ConnectionAccess {
-    static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
-    static boost::shared_ptr<ConnectionImpl>  getImpl(Connection& c) { return c.impl; }
-};
-} // namespace client
-
 namespace cluster {
 
 using broker::Broker;
@@ -169,10 +162,15 @@
 void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
     QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
     shadowConnection = catchUpConnection();
+
     broker::Connection& bc = dumpConnection->getBrokerConnection();
     // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
     // authentication etc. See ConnectionSettings.
     shadowConnection.open(dumpeeUrl, bc.getUserId());
+
+    // Stop the failover listener as its session will conflict with re-creating-sessions
+    client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener();
+    
     dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
         dumpConnection->getId().getMember(),
@@ -184,26 +182,21 @@
 void DumpClient::dumpSession(broker::SessionHandler& sh) {
     QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection()  << "[" << sh.getChannel() << "] = "
              << sh.getSession()->getId());
-    broker::SessionState* s = sh.getSession();
-    if (!s) return;         // no session.
+    broker::SessionState* ss = sh.getSession();
+    if (!ss) return;            // no session.
 
-    // Re-create the session.
+    // Create a client session to dump session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
-    size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
-    boost::shared_ptr<client::SessionImpl> simpl(
-        new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size));
-    cimpl->addSession(simpl);
-    simpl->open(sh.getSession()->getTimeout());
+    boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
     client::SessionBase_0_10Access(shadowSession).set(simpl);
     AMQP_AllProxy::ClusterConnection proxy(simpl->out);
 
     // Re-create session state on remote connection.
-    broker::SessionState* ss = sh.getSession();
 
     // For reasons unknown, boost::bind does not work here with boost 1.33.
     ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
     
-    // FIXME aconway 2008-09-19: remaining session state.
+    // FIXME aconway 2008-09-19: update remaining session state.
 
     // Reset command-sequence state.
     proxy.sessionState(
@@ -216,7 +209,7 @@
         ss->receiverGetIncomplete()
     );
 
-    // FIXME aconway 2008-09-23: session replay list.
+    // FIXME aconway 2008-09-23: update session replay list.
 
     QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Oct  9 12:36:51 2008
@@ -58,9 +58,7 @@
 // Called in write thread when the IO layer has no more data to write.
 // We do nothing in the write thread, we run doOutput only on delivery
 // of doOutput requests.
-bool  OutputInterceptor::doOutput() {
-    return false;
-}
+bool  OutputInterceptor::doOutput() { return false; }
 
 // Delivery of doOutput allows us to run the real connection doOutput()
 // which stocks up the write buffers with data.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=703237&r1=703236&r2=703237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct  9 12:36:51 2008
@@ -22,6 +22,7 @@
 #include "BrokerFixture.h"
 
 #include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionAccess.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/FailoverListener.h"
 #include "qpid/cluster/Cluster.h"
@@ -176,47 +177,14 @@
     return s;
 }
 
-std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) {
-    std::set<uint16_t> ports;
-    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) {
-        Url url((*i)->get<std::string>());
-        BOOST_REQUIRE(url.size() > 0);
-        BOOST_REQUIRE(url[0].get<TcpAddress>());
-        ports.insert(url[0].get<TcpAddress>()->port);
-    }
-    return ports;
-}
-
-std::set<uint16_t> portsFromFailoverMessage(const Message& m) {
-    framing::Array urlArray;
-    m.getHeaders().getArray("amq.failover", urlArray);
-    return portsFromFailoverArray(urlArray);
-}
-
-QPID_AUTO_TEST_CASE(FailoverExchange) {
-    ClusterFixture cluster(2);
-    Client c0(cluster[0], "c0");
-    c0.session.queueDeclare("q");
-    c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover");
-
-    Message m;
-    BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
-    BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m));
-
-    cluster.add();
-    BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
-    BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m));
-}
-
-std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) {
-    // Wait till there are n ports in the list.
-    vector<Url> kb = fl.getKnownBrokers();
-    for (size_t retry=1000; kb.size() != n && retry != 0; --retry) {
+template <class T>  std::set<uint16_t> knownBrokerPorts(T& source, size_t n) {
+    vector<Url> urls = source.getKnownBrokers();
+    for (size_t retry=1000; urls.size() != n && retry != 0; --retry) {
         ::usleep(1000);
-        kb = fl.getKnownBrokers();
+        urls = source.getKnownBrokers();
     }
     set<uint16_t> s;
-    for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) {
+    for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
         BOOST_MESSAGE("Failover URL: " << *i);      
         BOOST_CHECK(i->size() >= 1);
         BOOST_CHECK((*i)[0].get<TcpAddress>());
@@ -226,17 +194,29 @@
 }
 
 QPID_AUTO_TEST_CASE(testFailoverListener) {
-    ClusterFixture cluster(1);
+    ClusterFixture cluster(2);
     Client c0(cluster[0], "c0");
-    FailoverListener fl(c0.connection);
+    FailoverListener fl;
+    fl.start(ConnectionAccess::getImpl(c0.connection));
+    set<uint16_t> set0=makeSet(cluster);
 
+    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+    cluster.add();
+    BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3));
+    cluster.kill(2);
+    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+    ClusterFixture cluster(2);
+    Client c0(cluster[0], "c0");
     set<uint16_t> set0=makeSet(cluster);
 
-    BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
     cluster.add();
-    BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2));
-    cluster.kill(1);
-    BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+    BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3));
+    cluster.kill(2);
+    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
 }
 
 QPID_AUTO_TEST_CASE(DumpConsumers) {