You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/18 17:20:29 UTC
svn commit: r576947 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
broker/SessionHandler.cpp broker/SessionHandler.h framing/Handler.h
Author: aconway
Date: Tue Sep 18 08:20:29 2007
New Revision: 576947
URL: http://svn.apache.org/viewvc?rev=576947&view=rev
Log:
* src/qpid/broker/SessionHandler.cpp:
- Make SessionHandler an InOutHandler.
- SessionHandler::out sets channel ID on frames.
* src/qpid/framing/Handler.h: Fixed InOutHandler template.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=576947&r1=576946&r2=576947&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Sep 18 08:20:29 2007
@@ -40,11 +40,7 @@
}
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : connection(c), channel(ch), ignoring(false)
-{
- in = this;
- out = &c.getOutput();
-}
+ : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), ignoring(false), channelHandler(*this) {}
SessionHandler::~SessionHandler() {}
@@ -53,7 +49,7 @@
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::handle(AMQFrame& f) {
+void SessionHandler::handleIn(AMQFrame& f) {
// Note on channel states: a channel is open if session != 0. A
// channel that is closed (session == 0) can be in the "ignoring"
// state. This is a temporary state after we have sent a channel
@@ -62,7 +58,7 @@
//
AMQMethodBody* m=f.getMethod();
try {
- if (m && m->invoke(static_cast<Invocable*>(this)))
+ if (m && m->invoke(&channelHandler))
return;
else if (session)
session->in(f);
@@ -82,6 +78,11 @@
}
}
+void SessionHandler::handleOut(AMQFrame& f) {
+ f.setChannel(getChannel());
+ out.next->handle(f);
+}
+
void SessionHandler::assertOpen(const char* method) {
if (!session)
throw ChannelErrorException(
@@ -99,21 +100,21 @@
<< getChannel()));
}
-void SessionHandler::open(const string& /*outOfBand*/){
- assertClosed("open");
- session.reset(new Session(*this, 0));
- getProxy().getChannel().openOk();
+void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+ parent.assertClosed("open");
+ parent.session.reset(new Session(parent, 0));
+ parent.getProxy().getChannel().openOk();
}
// FIXME aconway 2007-08-31: flow is no longer in the spec.
-void SessionHandler::flow(bool active){
- session->flow(active);
- getProxy().getChannel().flowOk(active);
+void SessionHandler::ChannelMethods::flow(bool active){
+ parent.session->flow(active);
+ parent.getProxy().getChannel().flowOk(active);
}
-void SessionHandler::flowOk(bool /*active*/){}
+void SessionHandler::ChannelMethods::flowOk(bool /*active*/){}
-void SessionHandler::close(uint16_t replyCode,
+void SessionHandler::ChannelMethods::close(uint16_t replyCode,
const string& replyText,
uint16_t classId, uint16_t methodId)
{
@@ -123,21 +124,21 @@
<<replyText << ","
<< "classid=" <<classId<< ","
<< "methodid=" <<methodId);
- ignoring=false;
- getProxy().getChannel().closeOk();
+ parent.ignoring=false;
+ parent.getProxy().getChannel().closeOk();
// FIXME aconway 2007-08-31: sould reset session BEFORE
// sending closeOK to avoid races. SessionHandler
// needs its own private proxy, see getProxy() above.
- session.reset();
+ parent.session.reset();
// No need to remove from connection map, will be re-used
// if channel is re-opened.
}
-void SessionHandler::closeOk(){
- ignoring=false;
+void SessionHandler::ChannelMethods::closeOk(){
+ parent.ignoring=false;
}
-void SessionHandler::ok()
+void SessionHandler::ChannelMethods::ok()
{
//no specific action required, generic response handling should be
//sufficient
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=576947&r1=576946&r2=576947&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Sep 18 08:20:29 2007
@@ -44,20 +44,12 @@
*
* SessionHandlers can be stored in a map by value.
*/
-class SessionHandler :
- public framing::FrameHandler::Chains,
- private framing::FrameHandler,
- private framing::AMQP_ServerOperations::ChannelHandler
+class SessionHandler : public framing::FrameHandler::InOutHandler
{
public:
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
- /** Handle AMQP session methods, pass other frames to the session
- * if there is one. Frames channel must be == getChannel()
- */
- void handle(framing::AMQFrame&);
-
/** Returns 0 if not attached to a session */
Session* getSession() const { return session.get(); }
@@ -65,28 +57,40 @@
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
+
private:
+ // FIXME aconway 2007-08-31: Move to session methods.
+ struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler {
+ SessionHandler& parent;
+
+ ChannelMethods(SessionHandler& p) : parent(p) {}
+ void open(const std::string& outOfBand);
+ void flow(bool active);
+ void flowOk(bool active);
+ void ok( );
+ void ping( );
+ void pong( );
+ void resume( const std::string& channelId );
+ void close(uint16_t replyCode,
+ const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk();
+ };
+ friend class ChannelMethods;
+
void assertOpen(const char* method);
void assertClosed(const char* method);
framing::AMQP_ClientProxy& getProxy();
- // FIXME aconway 2007-08-31: Replace channel commands with session.
- void open(const std::string& outOfBand);
- void flow(bool active);
- void flowOk(bool active);
- void ok( );
- void ping( );
- void pong( );
- void resume( const std::string& channelId );
- void close(uint16_t replyCode, const
- std::string& replyText, uint16_t classId, uint16_t methodId);
- void closeOk();
-
Connection& connection;
const framing::ChannelId channel;
shared_ptr<Session> session;
bool ignoring;
+ ChannelMethods channelHandler;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=576947&r1=576946&r2=576947&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Tue Sep 18 08:20:29 2007
@@ -22,6 +22,7 @@
*
*/
#include "qpid/shared_ptr.h"
+#include <boost/type_traits/remove_reference.hpp>
#include <assert.h>
namespace qpid {
@@ -31,6 +32,7 @@
template <class T>
struct Handler {
typedef T HandledType;
+ typedef void handleFptr(T);
Handler(Handler<T>* next_=0) : next(next_) {}
virtual ~Handler() {}
@@ -74,37 +76,40 @@
};
/** Adapt a member function of X as a Handler.
- * MemFun<X, X::f> will copy x.
- * MemFun<X&, X::f> will only take a reference to x.
+ * Only holds a reference to its target, not a copy.
*/
- template <class X, void(*M)(T)>
- class MemFun : public Handler<T> {
+ template <class X, void (X::*F)(T)>
+ class MemFunRef : public Handler<T> {
public:
- MemFun(X x, Handler<T>* next=0) : Handler(next), object(x) {}
- void handle(T t) { object.*M(t); }
+ MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(x) {}
+ void handle(T t) { (target.*F)(t); }
+
+ /** Allow calling with -> syntax, compatible with Chains */
+ MemFunRef* operator->() { return this; }
+
private:
- X object;
+ X& target;
};
- /** Support for implementing an in-out handler pair as a single class.
- * Public interface is Handler<T>::Chains pair, but implementation
- * overrides handleIn, handleOut functions in a single class.
+ /** Interface for a handler that implements a
+ * pair of in/out handle operations.
+ * @see InOutHandler
*/
- class InOutHandler {
+ class InOutHandlerInterface {
public:
- virtual ~InOutHandler() {}
-
- InOutHandler() :
- in(*this, &InOutHandler::handleIn),
- out(*this, &InOutHandler::handleOut) {}
-
- MemFun<InOutHandler, &InOutHandler::handleIn> in;
- MemFun<InOutHandler, &InOutHandler::handleOut> out;
-
- protected:
+ virtual ~InOutHandlerInterface() {}
virtual void handleIn(T) = 0;
virtual void handleOut(T) = 0;
- private:
+ };
+
+ /** Support for implementing an in-out handler pair as a single class.
+ * Public interface is Handler<T>::Chains pair, but implementation
+ * overrides handleIn, handleOut functions in a single class.
+ */
+ struct InOutHandler : protected InOutHandlerInterface {
+ InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {}
+ MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in;
+ MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out;
};
};