You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/28 18:21:36 UTC
svn commit: r580403 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/
src/ src/qpid/ src/qpid/broker/ src/qpid/client/ src/qpid/log/ src/tests/
Author: aconway
Date: Fri Sep 28 09:21:34 2007
New Revision: 580403
URL: http://svn.apache.org/viewvc?rev=580403&view=rev
Log:
* src/tests/ClientSessionTest.cpp: Suspend/resume tests.
* broker/SessionManager.cpp, broker/SessionHandler.cpp:
Implement suspend/resume
* client/ScopedAssociation.h, SessionCore.h, SessionHandler.h:
Simplified relationships.
- Removed ScopedAssociation.
- SessionHandler: is now a member of SessionCore.
- SessionCore: shared_ptr ownership by Session(s) and ConnectionImpl.
- Using framing::FrameHandler interfaces.
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Fri Sep 28 09:21:34 2007
@@ -23,7 +23,7 @@
end
def declare_method (m)
- param_unpackers = m.fields.collect { |f| "args[#{f.cppname}|#{f.cpptype.default_value}]" }
+ param_unpackers = m.fields.collect { |f| "args[::qpid::client::#{f.cppname}|#{f.cpptype.default_value}]" }
if (m.content())
param_names = m.param_names + ["content"]
param_unpackers << "args[content|DefaultContent(\"\")]"
@@ -89,14 +89,14 @@
gen "){\n\n"
end
indent (2) {
- gen "return #{return_type(m)}(impl()->send(#{m.body_name}("
+ gen "return #{return_type(m)}(impl->send(#{m.body_name}("
params = ["version"] + m.param_names
gen params.join(", ")
other_params=[]
if (m.content())
- gen "), content), impl());\n"
+ gen "), content), impl);\n"
else
- gen ")), impl());\n"
+ gen ")), impl);\n"
end
}
gen "}\n\n"
@@ -127,6 +127,7 @@
#include <sstream>
#include <boost/parameter.hpp>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/framing/amqp_structs.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/MethodContent.h"
@@ -134,8 +135,9 @@
#include "qpid/client/Completion.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Response.h"
-#include "qpid/client/ScopedAssociation.h"
+#include "qpid/client/SessionCore.h"
#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
namespace qpid {
namespace client {
@@ -145,25 +147,26 @@
using framing::FieldTable;
using framing::MethodContent;
using framing::SequenceNumberSet;
+using framing::Uuid;
EOS
declare_keywords(@amqp.classes.select { |c| !excludes.include?(c.name) })
genl
gen <<EOS
class #{@classname} {
- ScopedAssociation::shared_ptr assoc;
+ shared_ptr<SessionCore> impl;
framing::ProtocolVersion version;
-
- SessionCore::shared_ptr impl();
-
-public:
#{@classname}();
- #{@classname}(ScopedAssociation::shared_ptr);
+ #{@classname}(shared_ptr<SessionCore>);
- framing::FrameSet::shared_ptr get() { return impl()->get(); }
- void setSynchronous(bool sync) { impl()->setSync(sync); }
+ friend class Connection;
+public:
+ framing::FrameSet::shared_ptr get() { return impl->get(); }
+ Uuid getId() const { return impl->getId(); }
+ void setSynchronous(bool sync) { impl->setSync(sync); }
+ void suspend();
void close();
- Execution& execution() { return impl()->getExecution(); }
+ Execution& execution() { return impl->getExecution(); }
typedef framing::TransferContent DefaultContent;
EOS
@@ -188,18 +191,10 @@
namespace client {
#{@classname}::#{@classname}() {}
-#{@classname}::#{@classname}(ScopedAssociation::shared_ptr _assoc) : assoc(_assoc) {}
+#{@classname}::#{@classname}(shared_ptr<SessionCore> core) : impl(core) {}
-SessionCore::shared_ptr #{@classname}::impl()
-{
- if (!assoc) throw Exception("Uninitialised session");
- return assoc->session;
-}
-
-void #{@classname}::close()
-{
- impl()->close();
-}
+void #{@classname}::suspend() { impl->suspend(); }
+void #{@classname}::close() { impl->close(); }
EOS
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 28 09:21:34 2007
@@ -310,7 +310,6 @@
qpid/client/MessageListener.h \
qpid/client/MessageQueue.h \
qpid/client/Response.h \
- qpid/client/ScopedAssociation.h \
qpid/client/SessionCore.h \
qpid/client/SessionHandler.h \
qpid/client/StateManager.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Fri Sep 28 09:21:34 2007
@@ -32,7 +32,7 @@
}
static void ctorLog(const std::exception* e) {
- QPID_LOG(trace, "Exception constructor " << typeid(e).name() << ": " << e->what());
+ QPID_LOG(trace, "Exception: " << e->what());
}
Exception::Exception() throw() { ctorLog(this); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Sep 28 09:21:34 2007
@@ -78,7 +78,6 @@
void idleIn();
void closed();
- // FIXME aconway 2007-08-30: When does closeChannel close the session?
void closeChannel(framing::ChannelId channel);
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -35,7 +35,7 @@
connection(c), channel(ch), proxy(out),
ignoring(false) {}
-SessionHandler::~SessionHandler() { }
+SessionHandler::~SessionHandler() {}
namespace {
ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
@@ -78,18 +78,15 @@
void SessionHandler::assertOpen(const char* method) {
if (!session.get())
throw ChannelErrorException(
- QPID_MSG(""<<method<<" failed: No session for channel "
+ QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
void SessionHandler::assertClosed(const char* method) {
- // FIXME aconway 2007-08-31: Should raise channel-busy, need
- // to update spec.
if (session.get())
- throw PreconditionFailedException(
- QPID_MSG(""<<method<<" failed: "
- << channel << " already open on channel "
- << getChannel()));
+ throw ChannelBusyException(
+ QPID_MSG(method << " failed: channel " << channel
+ << " is already open."));
}
void SessionHandler::open(uint32_t detachedLifetime) {
@@ -100,6 +97,12 @@
getProxy().getSession().attached(session->getId(), session->getTimeout());
}
+void SessionHandler::resume(const Uuid& id) {
+ assertClosed("resume");
+ session = connection.broker.getSessionManager().resume(*this, id);
+ getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
void SessionHandler::flow(bool /*active*/) {
// FIXME aconway 2007-09-19: Removed in 0-10, remove
assert(0); throw NotImplementedException();
@@ -115,26 +118,23 @@
ignoring=false;
session.reset();
getProxy().getSession().closed(REPLY_SUCCESS, "ok");
- // No need to remove from connection map, will be re-used
- // if channel is re-opened.
+ assert(&connection.getChannel(channel) == this);
+ connection.closeChannel(channel);
}
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
- // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
- // to text names.
QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
ignoring=false;
session.reset();
- // No need to remove from connection map, will be re-used
- // if channel is re-opened.
-}
-
-void SessionHandler::resume(const Uuid& /*sessionId*/) {
- assert(0); throw NotImplementedException();
}
void SessionHandler::suspend() {
- assert(0); throw NotImplementedException();
+ assertOpen("suspend");
+ connection.broker.getSessionManager().suspend(session);
+ assert(!session.get());
+ getProxy().getSession().detached();
+ assert(&connection.getChannel(channel) == this);
+ connection.closeChannel(channel);
}
void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Fri Sep 28 09:21:34 2007
@@ -22,19 +22,22 @@
#include "SessionManager.h"
#include "SessionState.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
#include "qpid/memory.h"
#include <boost/bind.hpp>
+#include <boost/range.hpp>
#include <algorithm>
#include <functional>
+#include <ostream>
namespace qpid {
namespace broker {
using namespace sys;
using namespace framing;
-using std::make_pair;
SessionManager::SessionManager() {}
@@ -51,12 +54,16 @@
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
- session->expiry = AbsTime(now(),session->getTimeout());
+ active.erase(session->getId());
+ session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ session->handler = 0;
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
-std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) {
+std::auto_ptr<SessionState> SessionManager::resume(
+ SessionHandler& sh, const Uuid& id)
+{
Mutex::ScopedLock l(lock);
eraseExpired();
if (active.find(id) != active.end())
@@ -70,15 +77,20 @@
throw InvalidArgumentException(
QPID_MSG("No suspended session with id=" << id));
active.insert(id);
- return make_auto_ptr(suspended.release(i).release());
+ std::auto_ptr<SessionState> state(suspended.release(i).release());
+ state->handler = &sh;
+ return state;
}
void SessionManager::eraseExpired() {
// Called with lock held.
- Suspended::iterator i = std::lower_bound(
- suspended.begin(), suspended.end(), now(),
- boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- suspended.erase(suspended.begin(), i);
+ if (!suspended.empty()) {
+ Suspended::iterator keep = std::lower_bound(
+ suspended.begin(), suspended.end(), now(),
+ boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Fri Sep 28 09:21:34 2007
@@ -47,8 +47,7 @@
SessionManager();
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(
- SessionHandler& h, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
@@ -58,7 +57,7 @@
/** Resume a suspended session.
*@throw Exception if timed out or non-existant.
*/
- std::auto_ptr<SessionState> resume(const framing::Uuid& id);
+ std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
private:
typedef boost::ptr_vector<SessionState> Suspended;
@@ -69,7 +68,7 @@
Active active;
void eraseExpired();
- friend class SessionState; // removes deleted sessions from active set.
+ friend class SessionState; // removes deleted sessions from active set.
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 28 09:21:34 2007
@@ -32,6 +32,7 @@
#include <set>
#include <vector>
+#include <ostream>
namespace qpid {
@@ -79,7 +80,7 @@
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
-
+
private:
/** Only SessionManager can open sessions */
SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
@@ -95,6 +96,11 @@
friend class SessionManager;
};
+
+
+inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
+ return out << session.getId();
+}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep 28 09:21:34 2007
@@ -25,7 +25,7 @@
#include "Connection.h"
#include "Channel.h"
#include "Message.h"
-#include "ScopedAssociation.h"
+#include "SessionCore.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -70,16 +70,22 @@
channel.open(newSession());
}
-Session Connection::newSession() {
- ChannelId id = ++channelIdCounter;
- SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
- session->open();
- return Session(assoc);
+Session Connection::newSession(uint32_t detachedLifetime) {
+ shared_ptr<SessionCore> core(
+ new SessionCore(*impl, ++channelIdCounter, max_frame_size));
+ impl->addSession(core);
+ core->open(detachedLifetime);
+ return Session(core);
}
-void Connection::close()
-{
+void Connection::resume(Session& session) {
+ shared_ptr<SessionCore> core=session.impl;
+ core->setChannel(++channelIdCounter);
+ impl->addSession(core);
+ core->resume(*impl);
+}
+
+void Connection::close() {
impl->close();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Sep 28 09:21:34 2007
@@ -28,7 +28,7 @@
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-
+#include "qpid/framing/Uuid.h"
namespace qpid {
@@ -122,7 +122,25 @@
*/
void openChannel(Channel&);
- Session newSession();
+ /**
+ * Create a new session on this connection. Sessions allow
+ * multiple streams of work to be multiplexed over the same
+ * connection.
+ *
+ *@param detachedLifetime: A session may be detached from its
+ * channel, either by calling Session::suspend() or because of a
+ * network failure. The session state is perserved for
+ * detachedLifetime seconds to allow a call to resume(). After
+ * that the broker may discard the session state. Default is 0,
+ * meaning the session cannot be resumed.
+ */
+ Session newSession(uint32_t detachedLifetime=0);
+
+ /**
+ * Resume a suspendded session. A session may be resumed
+ * on a different connection to the one that created it.
+ */
+ void resume(Session& session);
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Sep 28 09:21:34 2007
@@ -18,7 +18,11 @@
* under the License.
*
*/
+#include "qpid/framing/reply_exceptions.h"
+
#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -26,7 +30,8 @@
using namespace qpid::framing;
using namespace qpid::sys;
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false)
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
+ : connector(c), isClosed(false)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
@@ -37,22 +42,13 @@
connector->setShutdownHandler(this);
}
-void ConnectionImpl::allocated(SessionCore::shared_ptr session)
-{
- Mutex::ScopedLock l(lock);
- if (sessions.find(session->getId()) != sessions.end()) {
- throw Exception("Id already in use.");
- }
- sessions[session->getId()] = session;
-}
-
-void ConnectionImpl::released(SessionCore::shared_ptr session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(session->getId());
- if (i != sessions.end()) {
- sessions.erase(i);
- }
+ boost::shared_ptr<SessionCore>& s = sessions[session->getChannel()];
+ if (s)
+ throw ChannelBusyException();
+ s = session;
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
@@ -62,7 +58,14 @@
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- find(frame.getChannel())->handle(frame);
+ boost::shared_ptr<SessionCore> s;
+ {
+ Mutex::ScopedLock l(lock);
+ s = sessions[frame.getChannel()];
+ }
+ if (!s)
+ throw ChannelErrorException();
+ s->in(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -117,21 +120,10 @@
{
Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- Mutex::ScopedUnlock u(lock);
i->second->closed(code, text);
}
sessions.clear();
isClosed = true;
-}
-
-SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
-{
- Mutex::ScopedLock l(lock);
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- return i->second;
}
void ConnectionImpl::assertNotClosed()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Sep 28 09:21:34 2007
@@ -30,17 +30,18 @@
#include "qpid/sys/TimeoutHandler.h"
#include "ConnectionHandler.h"
#include "Connector.h"
-#include "SessionCore.h"
namespace qpid {
namespace client {
+class SessionCore;
+
class ConnectionImpl : public framing::FrameHandler,
- public sys::TimeoutHandler,
- public sys::ShutdownHandler
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
{
- typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap;
+ typedef std::map<uint16_t, boost::shared_ptr<SessionCore> > SessionMap;
SessionMap sessions;
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
@@ -56,14 +57,12 @@
void shutdown();
void signalClose(uint16_t, const std::string&);
void assertNotClosed();
- SessionCore::shared_ptr find(uint16_t);
-
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
- void allocated(SessionCore::shared_ptr);
- void released(SessionCore::shared_ptr);
+ void addSession(const boost::shared_ptr<SessionCore>&);
+
void open(const std::string& host, int port = 5672,
const std::string& uid = "guest",
const std::string& pwd = "guest",
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -170,7 +170,7 @@
if(l) {
completion.listenForResult(id, l);
}
- AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+ AMQFrame frame(0/*channel will be filled in by channel handler*/, command);
if (hasContent) {
frame.setEof(false);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Sep 28 09:21:34 2007
@@ -38,7 +38,7 @@
class ExecutionHandler :
private framing::AMQP_ServerOperations::ExecutionHandler,
- public ChainableFrameHandler,
+ public framing::FrameHandler,
public Execution
{
framing::SequenceNumber incomingCounter;
@@ -66,9 +66,14 @@
public:
typedef CompletionTracker::ResultListener ResultListener;
+ // Allow other classes to set the out handler.
+ framing::FrameHandler::Chain out;
+
ExecutionHandler(uint64_t maxFrameSize = 65536);
+ // Incoming handler.
void handle(framing::AMQFrame& frame);
+
framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
ResultListener=ResultListener());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Sep 28 09:21:34 2007
@@ -20,27 +20,25 @@
*/
#include "SessionCore.h"
-#include <boost/bind.hpp>
+#include "qpid/framing/constants.h"
#include "Future.h"
#include "FutureResponse.h"
#include "FutureResult.h"
+#include <boost/bind.hpp>
+
using namespace qpid::client;
using namespace qpid::framing;
-SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
- uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
+SessionCore::SessionCore(FrameHandler& out_, uint16_t ch, uint64_t maxFrameSize)
+ : channel(ch), l2(*this), l3(maxFrameSize), uuid(false), sync(false)
{
- l2.out = boost::bind(&FrameHandler::handle, out, _1);
- l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
- l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1);
- l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
+ l2.next = &l3;
+ l3.out = &out;
+ out.next = &out_;
}
-void SessionCore::open()
-{
- l2.open(id);
-}
+SessionCore::~SessionCore() {}
ExecutionHandler& SessionCore::getExecution()
{
@@ -50,6 +48,7 @@
FrameSet::shared_ptr SessionCore::get()
{
+ checkClosed();
return l3.getDemux().getDefault().pop();
}
@@ -63,38 +62,55 @@
return sync;
}
-void SessionCore::close()
-{
- l2.close();
- stop();
+namespace {
+struct ClosedOnExit {
+ SessionCore& core;
+ int code;
+ std::string text;
+ ClosedOnExit(SessionCore& s, int c, const std::string& t)
+ : core(s), code(c), text(t) {}
+ ~ClosedOnExit() { core.closed(code, text); }
+};
}
-void SessionCore::stop()
+void SessionCore::close()
{
- l3.getDemux().close();
- l3.getCompletionTracker().close();
+ checkClosed();
+ ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
+ l2.close();
}
-void SessionCore::handle(AMQFrame& frame)
-{
- l2.incoming(frame);
+void SessionCore::suspend() {
+ checkClosed();
+ ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
+ l2.suspend();
}
void SessionCore::closed(uint16_t code, const std::string& text)
{
- stop();
-
- isClosed = true;
+ out.next = 0;
reason.code = code;
reason.text = text;
+ l2.closed();
+ l3.getDemux().close();
+ l3.getCompletionTracker().close();
}
-void SessionCore::checkClosed()
+void SessionCore::checkClosed() const
{
- if (isClosed) {
- //TODO: could actually have been a connection exception
+ // TODO: could have been a connection exception
+ if(out.next == 0)
throw ChannelException(reason.code, reason.text);
- }
+}
+
+void SessionCore::open(uint32_t detachedLifetime) {
+ assert(out.next);
+ l2.open(detachedLifetime);
+}
+
+void SessionCore::resume(FrameHandler& out_) {
+ out.next = &out_;
+ l2.resume();
}
Future SessionCore::send(const AMQBody& command)
@@ -131,3 +147,15 @@
//send method impl:
return Future(l3.send(command, content));
}
+
+void SessionCore::handleIn(AMQFrame& frame) {
+ l2.handle(frame);
+}
+
+void SessionCore::handleOut(AMQFrame& frame)
+{
+ checkClosed();
+ frame.setChannel(channel);
+ out.next->handle(frame);
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Fri Sep 28 09:21:34 2007
@@ -28,6 +28,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/Uuid.h"
#include "SessionHandler.h"
#include "ExecutionHandler.h"
@@ -36,7 +37,12 @@
class Future;
-class SessionCore : public framing::FrameHandler
+/**
+ * Session implementation, sets up handler chains.
+ * Attaches to a SessionHandler when active, detaches
+ * when closed.
+ */
+class SessionCore : public framing::FrameHandler::InOutHandler
{
struct Reason
{
@@ -44,33 +50,49 @@
std::string text;
};
- ExecutionHandler l3;
+ uint16_t channel;
SessionHandler l2;
- const uint16_t id;
+ ExecutionHandler l3;
+ framing::Uuid uuid;
bool sync;
- bool isClosed;
Reason reason;
+
+ protected:
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ public:
+ typedef shared_ptr<SessionCore> shared_ptr;
-public:
- typedef boost::shared_ptr<SessionCore> shared_ptr;
+ SessionCore(framing::FrameHandler& out, uint16_t channel, uint64_t maxFrameSize);
+ ~SessionCore();
- SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
framing::FrameSet::shared_ptr get();
- uint16_t getId() const { return id; }
- void setSync(bool);
- bool isSync();
- void open();
+
+ framing::Uuid getId() const { return uuid; }
+ void setId(const framing::Uuid& id) { uuid= id; }
+
+ uint16_t getChannel() const { assert(channel); return channel; }
+ void setChannel(uint16_t ch) { assert(ch); channel=ch; }
+
+ void open(uint32_t detachedLifetime);
+
+ /** Closed by client code */
void close();
- void stop();
+
+ /** Closed by peer */
void closed(uint16_t code, const std::string& text);
- void checkClosed();
+
+ void resume(framing::FrameHandler& out);
+ void suspend();
+
+ void setSync(bool);
+ bool isSync();
ExecutionHandler& getExecution();
+ void checkClosed() const;
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-
- //for incoming frames:
- void handle(framing::AMQFrame& frame);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp Fri Sep 28 09:21:34 2007
@@ -22,31 +22,44 @@
#include "SessionHandler.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
+namespace {
+// TODO aconway 2007-09-28: hack till we have multi-version support.
+ProtocolVersion version;
+}
+
+SessionHandler::SessionHandler(SessionCore& parent)
+ : StateManager(CLOSED), core(parent) {}
+
+SessionHandler::~SessionHandler() {}
-void SessionHandler::incoming(AMQFrame& frame)
+void SessionHandler::handle(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- SessionClosedBody* closeBody=
+ core.checkClosed();
+ SessionClosedBody* closedBody=
dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closeBody) {
- setState(CLOSED_BY_PEER);
- code = closeBody->getReplyCode();
- text = closeBody->getReplyText();
- if (onClose) {
- onClose(closeBody->getReplyCode(), closeBody->getReplyText());
- }
+ if (closedBody) {
+ closed();
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
} else {
try {
- in(frame);
- }catch(ChannelException& e){
- closed(e.code, e.toString());
+ next->handle(frame);
+ }
+ catch(const ChannelException& e){
+ QPID_LOG(error, "Channel exception:" << e.what());
+ closed();
+ AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
+ core.out(f);
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
}
}
} else {
@@ -57,69 +70,63 @@
}
}
-void SessionHandler::outgoing(AMQFrame& frame)
-{
- if (getState() == OPEN) {
- frame.setChannel(id);
- out(frame);
- } else if (getState() == CLOSED) {
- throw Exception(QPID_MSG("Channel not open, can't send " << frame));
- } else if (getState() == CLOSED_BY_PEER) {
- throw ChannelException(code, text);
- }
-}
-
-void SessionHandler::open(uint16_t _id)
+void SessionHandler::attach(const AMQMethodBody& command)
{
- id = _id;
-
setState(OPENING);
- // FIXME aconway 2007-09-19: Need to get this from API.
- AMQFrame f(id, SessionOpenBody(version, 0));
- out(f);
-
+ AMQFrame f(0, command);
+ core.out(f);
std::set<int> states;
states.insert(OPEN);
- states.insert(CLOSED_BY_PEER);
+ states.insert(CLOSED);
waitFor(states);
- if (getState() != OPEN) {
- throw Exception("Failed to open channel.");
- }
+ if (getState() != OPEN)
+ throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
+}
+
+void SessionHandler::open(uint32_t detachedLifetime) {
+ attach(SessionOpenBody(version, detachedLifetime));
}
-void SessionHandler::close()
+void SessionHandler::resume() {
+ attach(SessionResumeBody(version, core.getId()));
+}
+
+void SessionHandler::detach(const AMQMethodBody& command)
{
setState(CLOSING);
- AMQFrame f(id, SessionCloseBody(version));
- out(f);
+ AMQFrame f(0, command);
+ core.out(f);
waitFor(CLOSED);
}
-void SessionHandler::closed(uint16_t code, const std::string& msg)
-{
- setState(CLOSED);
- AMQFrame f(id, SessionClosedBody(version, code, msg));
- out(f);
-}
+void SessionHandler::close() { detach(SessionCloseBody(version)); }
+void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
+void SessionHandler::closed() { setState(CLOSED); }
void SessionHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
- if (method->isA<SessionAttachedBody>()) {
- setState(OPEN);
- } else {
- throw ConnectionException(504, "Channel not opened.");
- }
- break;
+ case OPENING: {
+ SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
+ if (attached) {
+ core.setId(attached->getSessionId());
+ setState(OPEN);
+ } else
+ throw ChannelErrorException();
+ break;
+ }
case CLOSING:
- if (method->isA<SessionClosedBody>()) {
- setState(CLOSED);
- } //else just ignore it
+ if (method->isA<SessionClosedBody>() ||
+ method->isA<SessionDetachedBody>())
+ closed();
break;
+
case CLOSED:
- throw ConnectionException(504, "Channel is closed.");
+ throw ChannelErrorException();
+
default:
- throw Exception("Unexpected state encountered in SessionHandler!");
+ assert(0);
+ throw InternalErrorException(QPID_MSG("Internal Error."));
}
}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h Fri Sep 28 09:21:34 2007
@@ -22,36 +22,40 @@
#define _SessionHandler_
#include "StateManager.h"
-#include "ChainableFrameHandler.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/shared_ptr.h"
namespace qpid {
namespace client {
+class SessionCore;
-class SessionHandler : private StateManager, public ChainableFrameHandler
+/**
+ * Handles incoming session (L2) commands.
+ */
+class SessionHandler : public framing::FrameHandler,
+ private StateManager
{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
- framing::ProtocolVersion version;
- uint16_t id;
+ enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+ SessionCore& core;
- uint16_t code;
- std::string text;
-
void handleMethod(framing::AMQMethodBody* method);
- void closed(uint16_t code, const std::string& msg);
-
-public:
- typedef boost::function<void(uint16_t, const std::string&)> CloseListener;
-
- SessionHandler();
+ void attach(const framing::AMQMethodBody&);
+ void detach(const framing::AMQMethodBody&);
+
+ public:
+ SessionHandler(SessionCore& parent);
+ ~SessionHandler();
- void incoming(framing::AMQFrame& frame);
- void outgoing(framing::AMQFrame& frame);
+ /** Incoming from broker */
+ void handle(framing::AMQFrame&);
- void open(uint16_t id);
+ void open(uint32_t detachedLifetime);
+ void resume();
void close();
-
- CloseListener onClose;
+ void closed();
+ void suspend();
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Fri Sep 28 09:21:34 2007
@@ -113,7 +113,16 @@
stmt_.log(QPID_LOG_STRINGSTREAM(message)); \
} while(0)
-
+/**
+ * Macro for complicated logging logic that can't fit in a simple QPID_LOG
+ * statement. For example:
+ * @code
+ * QPID_IF_LOG(debug) {
+ * message = do_complicated_stuff;
+ * QPID_LOG(debug, message);
+ * }
+ */
+#define QPID_IF_LOG(level)
}} // namespace qpid::log
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Sep 28 09:21:34 2007
@@ -60,6 +60,8 @@
CPPUNIT_TEST(testQueueQuery);
CPPUNIT_TEST(testTransfer);
CPPUNIT_TEST(testDispatcher);
+ CPPUNIT_TEST(testSuspendResume);
+ CPPUNIT_TEST(testSuspendResumeErrors);
CPPUNIT_TEST_SUITE_END();
boost::shared_ptr<Connector> broker;
@@ -139,6 +141,28 @@
}
void testSuspendResume() {
+ session = connection.newSession(60);
+ session.suspend();
+ try {
+ session.exchangeQuery_(name="amq.fanout");
+ CPPUNIT_FAIL("Expected session suspended exception");
+ } catch(...) {}
+ connection.resume(session);
+ session.exchangeQuery_(name="amq.fanout");
+ // FIXME aconway 2007-09-25: build up session state and confirm
+ //it survives the resume
+ }
+
+ void testSuspendResumeErrors() {
+ session.suspend(); // session has 0 timeout.
+ try {
+ session.exchangeQuery_(name="amq.fanout");
+ CPPUNIT_FAIL("Expected suspended session exception");
+ } catch(...) {}
+ try {
+ connection.resume(session);
+ CPPUNIT_FAIL("Expected no such session exception.");
+ } catch(...) {}
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=580403&r1=580402&r2=580403&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Fri Sep 28 09:21:34 2007
@@ -24,6 +24,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/client/Connector.h"
#include "qpid/client/Connection.h"
+#include "qpid/log/Statement.h"
#include <vector>
#include <iostream>
@@ -101,7 +102,8 @@
) : sender(sender_), conversation(conversation_), in(ih) {}
void send(framing::AMQFrame& frame) {
- //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl;
+ QPID_LOG(debug,
+ (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame);
conversation.push_back(TaggedFrame(sender, frame));
in->received(frame);
}