You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/28 18:21:36 UTC

svn commit: r580403 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/ src/qpid/ src/qpid/broker/ src/qpid/client/ src/qpid/log/ src/tests/

Author: aconway
Date: Fri Sep 28 09:21:34 2007
New Revision: 580403

URL: http://svn.apache.org/viewvc?rev=580403&view=rev
Log:

	* src/tests/ClientSessionTest.cpp: Suspend/resume tests.
	
	* broker/SessionManager.cpp, broker/SessionHandler.cpp:
	Implement suspend/resume

	* client/ScopedAssociation.h, SessionCore.h, SessionHandler.h:
	  Simplified relationships.
	  - Removed ScopedAssociation.
	  - SessionHandler: is now a member of SessionCore.
	  - SessionCore: shared_ptr ownership by Session(s) and ConnectionImpl.
	  - Using framing::FrameHandler interfaces.

Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Fri Sep 28 09:21:34 2007
@@ -23,7 +23,7 @@
   end
 
   def declare_method (m)
-    param_unpackers = m.fields.collect { |f| "args[#{f.cppname}|#{f.cpptype.default_value}]" }
+    param_unpackers = m.fields.collect { |f| "args[::qpid::client::#{f.cppname}|#{f.cpptype.default_value}]" }
     if (m.content())
       param_names = m.param_names + ["content"]
       param_unpackers << "args[content|DefaultContent(\"\")]"
@@ -89,14 +89,14 @@
       gen "){\n\n"
     end
     indent (2) { 
-      gen "return #{return_type(m)}(impl()->send(#{m.body_name}(" 
+      gen "return #{return_type(m)}(impl->send(#{m.body_name}(" 
       params = ["version"] + m.param_names
       gen params.join(", ")
       other_params=[]
       if (m.content())
-        gen "), content), impl());\n"
+        gen "), content), impl);\n"
       else
-        gen ")), impl());\n"
+        gen ")), impl);\n"
       end
     }
     gen "}\n\n"
@@ -127,6 +127,7 @@
 #include <sstream> 
 #include <boost/parameter.hpp>
 #include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/framing/amqp_structs.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/MethodContent.h"
@@ -134,8 +135,9 @@
 #include "qpid/client/Completion.h"
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Response.h"
-#include "qpid/client/ScopedAssociation.h"
+#include "qpid/client/SessionCore.h"
 #include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
 
 namespace qpid {
 namespace client {
@@ -145,25 +147,26 @@
 using framing::FieldTable;
 using framing::MethodContent;
 using framing::SequenceNumberSet;
+using framing::Uuid;
 
 EOS
       declare_keywords(@amqp.classes.select { |c| !excludes.include?(c.name)  })
       genl 
       gen <<EOS
 class #{@classname} {
-  ScopedAssociation::shared_ptr assoc;
+  shared_ptr<SessionCore> impl;
   framing::ProtocolVersion version;
-  
-  SessionCore::shared_ptr impl();
-
-public:
     #{@classname}();
-    #{@classname}(ScopedAssociation::shared_ptr);
+    #{@classname}(shared_ptr<SessionCore>);
 
-    framing::FrameSet::shared_ptr get() { return impl()->get(); }
-    void setSynchronous(bool sync) { impl()->setSync(sync); } 
+  friend class Connection;
+public:
+    framing::FrameSet::shared_ptr get() { return impl->get(); }
+    Uuid getId() const { return impl->getId(); }
+    void setSynchronous(bool sync) { impl->setSync(sync); } 
+    void suspend();
     void close();
-    Execution& execution() { return impl()->getExecution(); }
+    Execution& execution() { return impl->getExecution(); }
 
     typedef framing::TransferContent DefaultContent;
 EOS
@@ -188,18 +191,10 @@
 namespace client {
 
 #{@classname}::#{@classname}() {}
-#{@classname}::#{@classname}(ScopedAssociation::shared_ptr _assoc) : assoc(_assoc) {}
+#{@classname}::#{@classname}(shared_ptr<SessionCore> core) : impl(core) {}
 
-SessionCore::shared_ptr #{@classname}::impl()
-{
-    if (!assoc) throw Exception("Uninitialised session");
-    return assoc->session;
-}
-
-void #{@classname}::close()
-{
-    impl()->close(); 
-}
+void #{@classname}::suspend() { impl->suspend(); }
+void #{@classname}::close() { impl->close(); }
 
 EOS
 

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 28 09:21:34 2007
@@ -310,7 +310,6 @@
   qpid/client/MessageListener.h \
   qpid/client/MessageQueue.h \
   qpid/client/Response.h \
-  qpid/client/ScopedAssociation.h \
   qpid/client/SessionCore.h \
   qpid/client/SessionHandler.h \
   qpid/client/StateManager.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Fri Sep 28 09:21:34 2007
@@ -32,7 +32,7 @@
 }
 
 static void ctorLog(const std::exception* e) {
-    QPID_LOG(trace, "Exception constructor " << typeid(e).name() << ": " << e->what());
+    QPID_LOG(trace, "Exception: " << e->what());
 }
     
 Exception::Exception() throw() { ctorLog(this); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Sep 28 09:21:34 2007
@@ -78,7 +78,6 @@
     void idleIn();
     void closed();
 
-    // FIXME aconway 2007-08-30: When does closeChannel close the session?
     void closeChannel(framing::ChannelId channel);
 
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -35,7 +35,7 @@
       connection(c), channel(ch), proxy(out),
       ignoring(false) {}
 
-SessionHandler::~SessionHandler() { }
+SessionHandler::~SessionHandler() {}
 
 namespace {
 ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
@@ -78,18 +78,15 @@
 void SessionHandler::assertOpen(const char* method) {
      if (!session.get())
         throw ChannelErrorException(
-            QPID_MSG(""<<method<<" failed: No session for channel "
+            QPID_MSG(method << " failed: No session for channel "
                      << getChannel()));
 }
 
 void SessionHandler::assertClosed(const char* method) {
-    // FIXME aconway 2007-08-31: Should raise channel-busy, need
-    // to update spec.
     if (session.get())
-        throw PreconditionFailedException(
-            QPID_MSG(""<<method<<" failed: "
-                     << channel << " already open on channel "
-                     << getChannel()));
+        throw ChannelBusyException(
+            QPID_MSG(method << " failed: channel " << channel
+                     << " is already open."));
 }
 
 void  SessionHandler::open(uint32_t detachedLifetime) {
@@ -100,6 +97,12 @@
     getProxy().getSession().attached(session->getId(), session->getTimeout());
 }
 
+void  SessionHandler::resume(const Uuid& id) {
+    assertClosed("resume");
+    session = connection.broker.getSessionManager().resume(*this, id);
+    getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
 void  SessionHandler::flow(bool /*active*/) {
     // FIXME aconway 2007-09-19: Removed in 0-10, remove 
     assert(0); throw NotImplementedException();
@@ -115,26 +118,23 @@
     ignoring=false;
     session.reset();
     getProxy().getSession().closed(REPLY_SUCCESS, "ok");
-    // No need to remove from connection map, will be re-used
-    // if channel is re-opened.
+    assert(&connection.getChannel(channel) == this);
+    connection.closeChannel(channel); 
 }
 
 void  SessionHandler::closed(uint16_t replyCode, const string& replyText) {
-    // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
-    // to text names.
     QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
     ignoring=false;
     session.reset();
-    // No need to remove from connection map, will be re-used
-    // if channel is re-opened.
-}
-
-void  SessionHandler::resume(const Uuid& /*sessionId*/) {
-    assert(0); throw NotImplementedException();
 }
 
 void  SessionHandler::suspend() {
-    assert(0); throw NotImplementedException();
+    assertOpen("suspend");
+    connection.broker.getSessionManager().suspend(session);
+    assert(!session.get());
+    getProxy().getSession().detached();
+    assert(&connection.getChannel(channel) == this);
+    connection.closeChannel(channel); 
 }
 
 void  SessionHandler::ack(uint32_t     /*cumulativeSeenMark*/,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Fri Sep 28 09:21:34 2007
@@ -22,19 +22,22 @@
 #include "SessionManager.h"
 #include "SessionState.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
 #include "qpid/memory.h"
 
 #include <boost/bind.hpp>
+#include <boost/range.hpp>
 
 #include <algorithm>
 #include <functional>
+#include <ostream>
 
 namespace qpid {
 namespace broker {
 
 using namespace sys;
 using namespace framing;
-using std::make_pair;
 
 SessionManager::SessionManager() {}
 
@@ -51,12 +54,16 @@
 
 void  SessionManager::suspend(std::auto_ptr<SessionState> session) {
     Mutex::ScopedLock l(lock);
-    session->expiry = AbsTime(now(),session->getTimeout());
+    active.erase(session->getId());
+    session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+    session->handler = 0;
     suspended.push_back(session.release()); // In expiry order
     eraseExpired();
 }
 
-std::auto_ptr<SessionState>  SessionManager::resume(const Uuid& id) {
+std::auto_ptr<SessionState>  SessionManager::resume(
+    SessionHandler& sh, const Uuid& id)
+{
     Mutex::ScopedLock l(lock);
     eraseExpired();
     if (active.find(id) != active.end()) 
@@ -70,15 +77,20 @@
         throw InvalidArgumentException(
             QPID_MSG("No suspended session with id=" << id));
     active.insert(id);
-    return make_auto_ptr(suspended.release(i).release());
+    std::auto_ptr<SessionState> state(suspended.release(i).release());
+    state->handler = &sh;
+    return state;
 }
 
 void SessionManager::eraseExpired() {
     // Called with lock held.
-    Suspended::iterator i = std::lower_bound(
-        suspended.begin(), suspended.end(), now(), 
-        boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
-    suspended.erase(suspended.begin(), i);
+    if (!suspended.empty()) {
+        Suspended::iterator keep = std::lower_bound(
+            suspended.begin(), suspended.end(), now(),
+            boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
+        QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+        suspended.erase(suspended.begin(), keep);
+    }
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Fri Sep 28 09:21:34 2007
@@ -47,8 +47,7 @@
     SessionManager();
     ~SessionManager();
     /** Open a new active session, caller takes ownership */
-    std::auto_ptr<SessionState> open(
-        SessionHandler& h, uint32_t timeout_);
+    std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
     
     /** Suspend a session, start it's timeout counter.
      * The factory takes ownership.
@@ -58,7 +57,7 @@
     /** Resume a suspended session.
      *@throw Exception if timed out or non-existant.
      */
-    std::auto_ptr<SessionState> resume(const framing::Uuid& id);
+    std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
 
   private:
     typedef boost::ptr_vector<SessionState> Suspended;
@@ -69,7 +68,7 @@
     Active active;
 
     void eraseExpired();             
-  friend class SessionState; // removes deleted sessions from active set. 
+  friend class SessionState; // removes deleted sessions from active set.
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 28 09:21:34 2007
@@ -32,6 +32,7 @@
 
 #include <set>
 #include <vector>
+#include <ostream>
 
 namespace qpid {
 
@@ -79,7 +80,7 @@
     uint32_t getTimeout() const { return timeout; }
     Broker& getBroker() { return broker; }
     framing::ProtocolVersion getVersion() const { return version; }
-
+    
   private:
     /** Only SessionManager can open sessions */
     SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
@@ -95,6 +96,11 @@
 
   friend class SessionManager;
 };
+
+
+inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
+    return out << session.getId();
+}
 
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep 28 09:21:34 2007
@@ -25,7 +25,7 @@
 #include "Connection.h"
 #include "Channel.h"
 #include "Message.h"
-#include "ScopedAssociation.h"
+#include "SessionCore.h"
 #include "qpid/log/Logger.h"
 #include "qpid/log/Options.h"
 #include "qpid/log/Statement.h"
@@ -70,16 +70,22 @@
     channel.open(newSession());
 }
 
-Session Connection::newSession() {
-    ChannelId id = ++channelIdCounter;
-    SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
-    ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
-    session->open();
-    return Session(assoc);
+Session Connection::newSession(uint32_t detachedLifetime) {
+    shared_ptr<SessionCore> core(
+        new SessionCore(*impl, ++channelIdCounter, max_frame_size));
+    impl->addSession(core);
+    core->open(detachedLifetime);
+    return Session(core);
 }
 
-void Connection::close()
-{
+void Connection::resume(Session& session) {
+    shared_ptr<SessionCore> core=session.impl;
+    core->setChannel(++channelIdCounter);
+    impl->addSession(core);
+    core->resume(*impl);
+}
+
+void Connection::close() {
     impl->close();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Sep 28 09:21:34 2007
@@ -28,7 +28,7 @@
 #include "ConnectionImpl.h"
 #include "qpid/client/Session.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
-
+#include "qpid/framing/Uuid.h"
 
 namespace qpid {
 
@@ -122,7 +122,25 @@
      */
     void openChannel(Channel&);
 
-    Session newSession();
+    /**
+     * Create a new session on this connection.  Sessions allow
+     * multiple streams of work to be multiplexed over the same
+     * connection.
+     *
+     *@param detachedLifetime: A session may be detached from its
+     * channel, either by calling Session::suspend() or because of a
+     * network failure. The session state is perserved for
+     * detachedLifetime seconds to allow a call to resume(). After
+     * that the broker may discard the session state. Default is 0,
+     * meaning the session cannot be resumed.
+     */
+    Session newSession(uint32_t detachedLifetime=0);
+
+    /**
+     * Resume a suspendded session. A session may be resumed
+     * on a different connection to the one that created it.
+     */
+    void resume(Session& session);
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Sep 28 09:21:34 2007
@@ -18,7 +18,11 @@
  * under the License.
  *
  */
+#include "qpid/framing/reply_exceptions.h"
+
 #include "ConnectionImpl.h"
+#include "SessionCore.h"
+
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -26,7 +30,8 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false)
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
+    : connector(c), isClosed(false)
 {
     handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
     handler.out = boost::bind(&Connector::send, connector, _1);
@@ -37,22 +42,13 @@
     connector->setShutdownHandler(this);
 }
 
-void ConnectionImpl::allocated(SessionCore::shared_ptr session)
-{
-    Mutex::ScopedLock l(lock);
-    if (sessions.find(session->getId()) != sessions.end()) {
-        throw Exception("Id already in use.");
-    }
-    sessions[session->getId()] = session;
-}
-
-void ConnectionImpl::released(SessionCore::shared_ptr session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
 {
     Mutex::ScopedLock l(lock);
-    SessionMap::iterator i = sessions.find(session->getId());
-    if (i != sessions.end()) {
-        sessions.erase(i);
-    }
+    boost::shared_ptr<SessionCore>& s = sessions[session->getChannel()];
+    if (s)
+        throw ChannelBusyException();
+    s = session;
 }
 
 void ConnectionImpl::handle(framing::AMQFrame& frame)
@@ -62,7 +58,14 @@
 
 void ConnectionImpl::incoming(framing::AMQFrame& frame)
 {
-    find(frame.getChannel())->handle(frame);
+    boost::shared_ptr<SessionCore> s;
+    {
+        Mutex::ScopedLock l(lock);
+        s = sessions[frame.getChannel()];
+    }
+    if (!s)
+        throw ChannelErrorException();
+    s->in(frame);
 }
 
 void ConnectionImpl::open(const std::string& host, int port,
@@ -117,21 +120,10 @@
 {
     Mutex::ScopedLock l(lock);
     for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
-        Mutex::ScopedUnlock u(lock);
         i->second->closed(code, text);
     }
     sessions.clear();
     isClosed = true;
-}
-
-SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
-{
-    Mutex::ScopedLock l(lock);
-    SessionMap::iterator i = sessions.find(id);
-    if (i == sessions.end()) {
-        throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
-    }
-    return i->second;
 }
 
 void ConnectionImpl::assertNotClosed()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Sep 28 09:21:34 2007
@@ -30,17 +30,18 @@
 #include "qpid/sys/TimeoutHandler.h"
 #include "ConnectionHandler.h"
 #include "Connector.h"
-#include "SessionCore.h"
 
 namespace qpid {
 namespace client {
 
+class SessionCore;
+
 class ConnectionImpl : public framing::FrameHandler,
-        public sys::TimeoutHandler, 
-        public sys::ShutdownHandler
+                       public sys::TimeoutHandler, 
+                       public sys::ShutdownHandler
 
 {
-    typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap;
+    typedef std::map<uint16_t, boost::shared_ptr<SessionCore> > SessionMap;
     SessionMap sessions; 
     ConnectionHandler handler;
     boost::shared_ptr<Connector> connector;
@@ -56,14 +57,12 @@
     void shutdown();
     void signalClose(uint16_t, const std::string&);
     void assertNotClosed();
-    SessionCore::shared_ptr find(uint16_t);
-
 public:
     typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
 
     ConnectionImpl(boost::shared_ptr<Connector> c);
-    void allocated(SessionCore::shared_ptr);
-    void released(SessionCore::shared_ptr);
+    void addSession(const boost::shared_ptr<SessionCore>&);
+        
     void open(const std::string& host, int port = 5672, 
               const std::string& uid = "guest",
               const std::string& pwd = "guest", 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -170,7 +170,7 @@
     if(l) {
         completion.listenForResult(id, l);
     }
-    AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+    AMQFrame frame(0/*channel will be filled in by channel handler*/, command);
     if (hasContent) {
         frame.setEof(false);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Sep 28 09:21:34 2007
@@ -38,7 +38,7 @@
 
 class ExecutionHandler : 
     private framing::AMQP_ServerOperations::ExecutionHandler,
-    public ChainableFrameHandler,
+    public framing::FrameHandler,
     public Execution
 {
     framing::SequenceNumber incomingCounter;
@@ -66,9 +66,14 @@
 public:
     typedef CompletionTracker::ResultListener ResultListener;
 
+    // Allow other classes to set the out handler.
+    framing::FrameHandler::Chain out;
+
     ExecutionHandler(uint64_t maxFrameSize = 65536);
 
+    // Incoming handler. 
     void handle(framing::AMQFrame& frame);
+    
     framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
     framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, 
                                  ResultListener=ResultListener());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Sep 28 09:21:34 2007
@@ -20,27 +20,25 @@
  */
 
 #include "SessionCore.h"
-#include <boost/bind.hpp>
+#include "qpid/framing/constants.h"
 #include "Future.h"
 #include "FutureResponse.h"
 #include "FutureResult.h"
 
+#include <boost/bind.hpp>
+
 using namespace qpid::client;
 using namespace qpid::framing;
 
-SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out, 
-                         uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
+SessionCore::SessionCore(FrameHandler& out_, uint16_t ch, uint64_t maxFrameSize)
+    : channel(ch), l2(*this), l3(maxFrameSize), uuid(false), sync(false)
 {
-    l2.out = boost::bind(&FrameHandler::handle, out, _1);
-    l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
-    l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1);
-    l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
+    l2.next = &l3;
+    l3.out = &out;
+    out.next = &out_;
 }
 
-void SessionCore::open()
-{
-    l2.open(id);
-}
+SessionCore::~SessionCore() {}
 
 ExecutionHandler& SessionCore::getExecution()
 {
@@ -50,6 +48,7 @@
 
 FrameSet::shared_ptr SessionCore::get()
 {
+    checkClosed();
     return l3.getDemux().getDefault().pop();
 }
 
@@ -63,38 +62,55 @@
     return sync;
 }
 
-void SessionCore::close()
-{
-    l2.close();
-    stop();
+namespace {
+struct ClosedOnExit {
+    SessionCore& core;
+    int code;
+    std::string text;
+    ClosedOnExit(SessionCore& s, int c, const std::string& t)
+        : core(s), code(c), text(t) {}
+    ~ClosedOnExit() { core.closed(code, text); }
+};
 }
 
-void SessionCore::stop()
+void SessionCore::close()
 {
-    l3.getDemux().close();
-    l3.getCompletionTracker().close();
+    checkClosed();
+    ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
+    l2.close();
 }
 
-void SessionCore::handle(AMQFrame& frame)
-{
-    l2.incoming(frame);
+void SessionCore::suspend() {
+    checkClosed();
+    ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
+    l2.suspend();
 }
 
 void SessionCore::closed(uint16_t code, const std::string& text)
 {
-    stop();
-
-    isClosed = true;
+    out.next = 0;
     reason.code = code;
     reason.text = text;
+    l2.closed();
+    l3.getDemux().close();
+    l3.getCompletionTracker().close();
 }
 
-void SessionCore::checkClosed()
+void SessionCore::checkClosed() const
 {
-    if (isClosed) {
-        //TODO: could actually have been a connection exception
+    // TODO: could have been a connection exception
+    if(out.next == 0)
         throw ChannelException(reason.code, reason.text);
-    }
+}
+
+void SessionCore::open(uint32_t detachedLifetime) {
+    assert(out.next);
+    l2.open(detachedLifetime);
+}
+
+void SessionCore::resume(FrameHandler& out_) {
+    out.next = &out_;
+    l2.resume();
 }
 
 Future SessionCore::send(const AMQBody& command)
@@ -131,3 +147,15 @@
     //send method impl:
     return Future(l3.send(command, content));
 }
+
+void SessionCore::handleIn(AMQFrame& frame) {
+    l2.handle(frame);
+}
+
+void SessionCore::handleOut(AMQFrame& frame)
+{
+    checkClosed();
+    frame.setChannel(channel);
+    out.next->handle(frame);
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Fri Sep 28 09:21:34 2007
@@ -28,6 +28,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
+#include "qpid/framing/Uuid.h"
 #include "SessionHandler.h"
 #include "ExecutionHandler.h"
 
@@ -36,7 +37,12 @@
 
 class Future;
 
-class SessionCore : public framing::FrameHandler
+/**
+ * Session implementation, sets up handler chains.
+ * Attaches to a SessionHandler when active, detaches
+ * when closed.
+ */
+class SessionCore : public framing::FrameHandler::InOutHandler
 {
     struct Reason
     {
@@ -44,33 +50,49 @@
         std::string text;
     };
 
-    ExecutionHandler l3;
+    uint16_t channel;
     SessionHandler l2;
-    const uint16_t id;
+    ExecutionHandler l3;
+    framing::Uuid uuid;
     bool sync;
-    bool isClosed;
     Reason reason;
+
+  protected:
+    void handleIn(framing::AMQFrame& frame);
+    void handleOut(framing::AMQFrame& frame);
+
+  public:
+    typedef shared_ptr<SessionCore> shared_ptr;
     
-public:    
-    typedef boost::shared_ptr<SessionCore> shared_ptr;
+    SessionCore(framing::FrameHandler& out, uint16_t channel, uint64_t maxFrameSize);
+    ~SessionCore();
 
-    SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
     framing::FrameSet::shared_ptr get();
-    uint16_t getId() const { return id; } 
-    void setSync(bool);
-    bool isSync();
-    void open();
+
+    framing::Uuid getId() const { return uuid; } 
+    void setId(const framing::Uuid& id)  { uuid= id; }
+        
+    uint16_t getChannel() const { assert(channel); return channel; }
+    void setChannel(uint16_t ch) { assert(ch); channel=ch; }
+
+    void open(uint32_t detachedLifetime);
+
+    /** Closed by client code */
     void close();
-    void stop();
+
+    /** Closed by peer */
     void closed(uint16_t code, const std::string& text);
-    void checkClosed();
+
+    void resume(framing::FrameHandler& out);
+    void suspend();
+
+    void setSync(bool);
+    bool isSync();
     ExecutionHandler& getExecution();
+    void checkClosed() const;
 
     Future send(const framing::AMQBody& command);
     Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-    
-    //for incoming frames:
-    void handle(framing::AMQFrame& frame);    
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -22,31 +22,44 @@
 #include "SessionHandler.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace boost;
 
-SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
+namespace {
+// TODO aconway 2007-09-28: hack till we have multi-version support.
+ProtocolVersion version;
+}
+
+SessionHandler::SessionHandler(SessionCore& parent)
+    : StateManager(CLOSED), core(parent) {}
+
+SessionHandler::~SessionHandler() {}
 
-void SessionHandler::incoming(AMQFrame& frame)
+void SessionHandler::handle(AMQFrame& frame)
 {
     AMQBody* body = frame.getBody();
     if (getState() == OPEN) {
-        SessionClosedBody* closeBody=
+        core.checkClosed();
+        SessionClosedBody* closedBody=
             dynamic_cast<SessionClosedBody*>(body->getMethod());
-        if (closeBody) {
-            setState(CLOSED_BY_PEER);
-            code = closeBody->getReplyCode();
-            text = closeBody->getReplyText();
-            if (onClose) {
-                onClose(closeBody->getReplyCode(), closeBody->getReplyText());
-            }
+        if (closedBody) {
+            closed();
+            core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
         } else {
             try {
-                in(frame);
-            }catch(ChannelException& e){
-                closed(e.code, e.toString());
+                next->handle(frame);
+            }
+            catch(const ChannelException& e){
+                QPID_LOG(error, "Channel exception:" << e.what());
+                closed();
+                AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
+                core.out(f);
+                core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
             }
         }
     } else {
@@ -57,69 +70,63 @@
     }
 }
 
-void SessionHandler::outgoing(AMQFrame& frame)
-{
-    if (getState() == OPEN) {
-        frame.setChannel(id);
-        out(frame);
-    } else if (getState() == CLOSED) {
-        throw Exception(QPID_MSG("Channel not open, can't send " << frame));
-    } else if (getState() == CLOSED_BY_PEER) {
-        throw ChannelException(code, text);
-    }
-}
-
-void SessionHandler::open(uint16_t _id)
+void SessionHandler::attach(const AMQMethodBody& command)
 {
-    id = _id;
-
     setState(OPENING);
-    // FIXME aconway 2007-09-19: Need to get this from API.
-    AMQFrame f(id, SessionOpenBody(version, 0));
-    out(f);
-
+    AMQFrame f(0, command);
+    core.out(f);
     std::set<int> states;
     states.insert(OPEN);
-    states.insert(CLOSED_BY_PEER);
+    states.insert(CLOSED);
     waitFor(states);
-    if (getState() != OPEN) {
-        throw Exception("Failed to open channel.");
-    }
+    if (getState() != OPEN) 
+        throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
+}
+
+void SessionHandler::open(uint32_t detachedLifetime) {
+    attach(SessionOpenBody(version, detachedLifetime));
 }
 
-void SessionHandler::close()
+void SessionHandler::resume() {
+    attach(SessionResumeBody(version, core.getId()));
+}
+
+void SessionHandler::detach(const AMQMethodBody& command)
 {
     setState(CLOSING);
-    AMQFrame f(id, SessionCloseBody(version));
-    out(f);
+    AMQFrame f(0, command);
+    core.out(f);
     waitFor(CLOSED);
 }
 
-void SessionHandler::closed(uint16_t code, const std::string& msg)
-{
-    setState(CLOSED);
-    AMQFrame f(id, SessionClosedBody(version, code, msg));
-    out(f);
-}
+void SessionHandler::close() { detach(SessionCloseBody(version)); }
+void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
+void SessionHandler::closed() { setState(CLOSED); }
 
 void SessionHandler::handleMethod(AMQMethodBody* method)
 {
     switch (getState()) {
-      case OPENING:
-        if (method->isA<SessionAttachedBody>()) {
-            setState(OPEN);
-        } else {
-            throw ConnectionException(504, "Channel not opened.");
-        }
-        break;
+      case OPENING: {
+          SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
+          if (attached) {
+              core.setId(attached->getSessionId());
+              setState(OPEN);
+          } else 
+              throw ChannelErrorException();
+          break;
+      }
       case CLOSING:
-        if (method->isA<SessionClosedBody>()) {
-            setState(CLOSED);
-        } //else just ignore it
+        if (method->isA<SessionClosedBody>() ||
+            method->isA<SessionDetachedBody>())
+            closed();
         break;
+        
       case CLOSED:
-        throw ConnectionException(504, "Channel is closed.");
+        throw ChannelErrorException();
+        
       default:
-        throw Exception("Unexpected state encountered in SessionHandler!");
+        assert(0);
+        throw InternalErrorException(QPID_MSG("Internal Error."));
     }
 }
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h Fri Sep 28 09:21:34 2007
@@ -22,36 +22,40 @@
 #define _SessionHandler_
 
 #include "StateManager.h"
-#include "ChainableFrameHandler.h"
+#include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/shared_ptr.h"
 
 namespace qpid {
 namespace client {
+class SessionCore;
 
-class SessionHandler : private StateManager, public ChainableFrameHandler
+/**
+ * Handles incoming session (L2) commands.
+ */
+class SessionHandler : public framing::FrameHandler,
+                       private StateManager
 {
-    enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
-    framing::ProtocolVersion version;
-    uint16_t id;
+    enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+    SessionCore& core;
     
-    uint16_t code;
-    std::string text;
-
     void handleMethod(framing::AMQMethodBody* method);
-    void closed(uint16_t code, const std::string& msg);
-
-public:
-    typedef boost::function<void(uint16_t, const std::string&)> CloseListener;    
-
-    SessionHandler();
+    void attach(const framing::AMQMethodBody&);
+    void detach(const framing::AMQMethodBody&);
+    
+  public:
+    SessionHandler(SessionCore& parent);
+    ~SessionHandler();
 
-    void incoming(framing::AMQFrame& frame);
-    void outgoing(framing::AMQFrame& frame);
+    /** Incoming from broker */
+    void handle(framing::AMQFrame&);
 
-    void open(uint16_t id);
+    void open(uint32_t detachedLifetime);
+    void resume();
     void close();
-    
-    CloseListener onClose;
+    void closed();              
+    void suspend();
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Fri Sep 28 09:21:34 2007
@@ -113,7 +113,16 @@
             stmt_.log(QPID_LOG_STRINGSTREAM(message));                \
     } while(0)
 
-
+/**
+ * Macro for complicated logging logic that can't fit in a simple QPID_LOG
+ * statement. For example:
+ * @code
+ * QPID_IF_LOG(debug) {
+ *   message = do_complicated_stuff;
+ *   QPID_LOG(debug, message);
+ * }
+ */
+#define QPID_IF_LOG(level)
 }} // namespace qpid::log
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Sep 28 09:21:34 2007
@@ -60,6 +60,8 @@
     CPPUNIT_TEST(testQueueQuery);
     CPPUNIT_TEST(testTransfer);
     CPPUNIT_TEST(testDispatcher);
+    CPPUNIT_TEST(testSuspendResume);
+    CPPUNIT_TEST(testSuspendResumeErrors);
     CPPUNIT_TEST_SUITE_END();
 
     boost::shared_ptr<Connector> broker;
@@ -139,6 +141,28 @@
     }
 
     void testSuspendResume() {
+        session = connection.newSession(60);
+        session.suspend();
+        try {
+            session.exchangeQuery_(name="amq.fanout");
+            CPPUNIT_FAIL("Expected session suspended exception");
+        } catch(...) {}
+        connection.resume(session);
+        session.exchangeQuery_(name="amq.fanout");
+        // FIXME aconway 2007-09-25: build up session state and confirm
+        //it survives the resume
+    }
+
+    void testSuspendResumeErrors() {
+        session.suspend();  // session has 0 timeout.
+        try {
+            session.exchangeQuery_(name="amq.fanout");
+            CPPUNIT_FAIL("Expected suspended session exception");
+        } catch(...) {}
+        try {
+            connection.resume(session);
+            CPPUNIT_FAIL("Expected no such session exception.");
+        } catch(...) {}
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Fri Sep 28 09:21:34 2007
@@ -24,6 +24,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/client/Connector.h"
 #include "qpid/client/Connection.h"
+#include "qpid/log/Statement.h"
 
 #include <vector>
 #include <iostream>
@@ -101,7 +102,8 @@
         ) : sender(sender_), conversation(conversation_), in(ih) {}
 
         void send(framing::AMQFrame& frame) {
-            //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl;
+            QPID_LOG(debug,
+                     (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame);
             conversation.push_back(TaggedFrame(sender, frame));
             in->received(frame);
         }