You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/20 00:34:18 UTC
svn commit: r577459 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/client/
Author: aconway
Date: Wed Sep 19 15:34:11 2007
New Revision: 577459
URL: http://svn.apache.org/viewvc?rev=577459&view=rev
Log:
AMQP 0-10 Session suppported on broker and client.
Client always uses session on the wire but client::Channel API is
still available until all C++ tests are migrated.
Broker allows both session and channel connection to support python
tests.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
- copied, changed from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
- copied, changed from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 19 15:34:11 2007
@@ -218,7 +218,7 @@
qpid/client/MessageListener.cpp \
qpid/client/Correlator.cpp \
qpid/client/CompletionTracker.cpp \
- qpid/client/ChannelHandler.cpp \
+ qpid/client/SessionHandler.cpp \
qpid/client/ConnectionHandler.cpp \
qpid/client/ExecutionHandler.cpp \
qpid/client/FutureCompletion.cpp \
@@ -308,7 +308,7 @@
qpid/client/BlockingQueue.h \
qpid/client/Correlator.h \
qpid/client/CompletionTracker.h \
- qpid/client/ChannelHandler.h \
+ qpid/client/SessionHandler.h \
qpid/client/ChainableFrameHandler.h \
qpid/client/ConnectionHandler.h \
qpid/client/Execution.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Wed Sep 19 15:34:11 2007
@@ -60,6 +60,7 @@
: adapter(&a),
broker(adapter->getConnection().broker),
timeout(t),
+ id(true),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Wed Sep 19 15:34:11 2007
@@ -34,6 +34,7 @@
#include "TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
@@ -96,6 +97,7 @@
SessionHandler* adapter;
Broker& broker;
uint32_t timeout;
+ framing::Uuid id;
boost::ptr_vector<framing::FrameHandler> handlers;
DeliveryAdapter* deliveryAdapter;
@@ -135,8 +137,10 @@
Broker& getBroker() const { return broker; }
- /** Session timeout. */
+ /** Session timeout, aka detached-lifetime. */
uint32_t getTimeout() const { return timeout; }
+ /** Session ID */
+ const framing::Uuid& getId() const { return id; }
/**
* Get named queue, never returns 0.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Sep 19 15:34:11 2007
@@ -28,11 +28,13 @@
namespace qpid {
namespace broker {
using namespace framing;
+using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
connection(c), channel(ch), proxy(out),
- ignoring(false), channelHandler(*this) {}
+ ignoring(false), channelHandler(*this),
+ useChannelClose(false) {}
SessionHandler::~SessionHandler() {}
@@ -50,18 +52,22 @@
//
AMQMethodBody* m=f.getMethod();
try {
- if (m && m->invoke(&channelHandler))
+ if (m && (m->invoke(this) || m->invoke(&channelHandler)))
return;
else if (session)
session->in(f);
else if (!ignoring)
throw ChannelErrorException(
QPID_MSG("Channel " << channel << " is not open"));
- } catch(const ChannelException& e){
- getProxy().getChannel().close(
- e.code, e.toString(), classId(m), methodId(m));
- session.reset();
+ } catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
+ session.reset();
+ // FIXME aconway 2007-09-19: Dual-mode hack.
+ if (useChannelClose)
+ getProxy().getChannel().close(
+ e.code, e.toString(), classId(m), methodId(m));
+ else
+ getProxy().getSession().closed(e.code, e.toString());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -93,6 +99,7 @@
}
void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+ parent.useChannelClose=true;
parent.assertClosed("open");
parent.session.reset(new Session(parent, 0));
parent.getProxy().getChannel().openOk();
@@ -112,7 +119,7 @@
{
// FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
// to text names.
- QPID_LOG(warning, "Received session.close("<<replyCode<<","
+ QPID_LOG(warning, "Received channel.close("<<replyCode<<","
<<replyText << ","
<< "classid=" <<classId<< ","
<< "methodid=" <<methodId);
@@ -134,6 +141,62 @@
{
//no specific action required, generic response handling should be
//sufficient
+}
+
+void SessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ session.reset(new Session(*this, detachedLifetime));
+ getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
+void SessionHandler::flow(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::flowOk(bool /*active*/) {
+ // FIXME aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::close() {
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session.reset();
+ getProxy().getSession().closed(REPLY_SUCCESS, "ok");
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+ // to text names.
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session.reset();
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionHandler::resume(const Uuid& /*sessionId*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::suspend() {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
+ const SequenceNumberSet& /*seenFrameSet*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ assert(0); throw NotImplementedException();
+}
+
+void SessionHandler::solicitAck() {
+ assert(0); throw NotImplementedException();
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Sep 19 15:34:11 2007
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSIONADAPTER_H
-#define QPID_BROKER_SESSIONADAPTER_H
+#ifndef QPID_BROKER_SESSIONHANDLER_H
+#define QPID_BROKER_SESSIONHANDLER_H
/*
*
@@ -40,7 +40,8 @@
*
* SessionHandlers can be stored in a map by value.
*/
-class SessionHandler : public framing::FrameHandler::InOutHandler
+class SessionHandler : public framing::FrameHandler::InOutHandler,
+ private framing::AMQP_ServerOperations::SessionHandler
{
public:
SessionHandler(Connection&, framing::ChannelId);
@@ -63,7 +64,7 @@
void handleOut(framing::AMQFrame&);
private:
- // FIXME aconway 2007-08-31: Move to session methods.
+ // FIXME aconway 2007-08-31: Drop channel.
struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler {
SessionHandler& parent;
@@ -81,7 +82,21 @@
void closeOk();
};
friend class ChannelMethods;
-
+
+ /// Session methods
+ void open(uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void close();
+ void closed(uint16_t replyCode, const std::string& replyText);
+ void resume(const framing::Uuid& sessionId);
+ void suspend();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+
+
void assertOpen(const char* method);
void assertClosed(const char* method);
@@ -91,8 +106,11 @@
shared_ptr<Session> session;
bool ignoring;
ChannelMethods channelHandler;
+ bool useChannelClose; // FIXME aconway 2007-09-19: remove with channel.
};
}} // namespace qpid::broker
-#endif /*!QPID_BROKER_SESSIONADAPTER_H*/
+
+
+#endif /*!QPID_BROKER_SESSIONHANDLER_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 19 15:34:11 2007
@@ -33,7 +33,7 @@
{
l2.out = boost::bind(&FrameHandler::handle, out, _1);
l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
- l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1);
+ l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1);
l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Wed Sep 19 15:34:11 2007
@@ -28,7 +28,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
-#include "ChannelHandler.h"
+#include "SessionHandler.h"
#include "ExecutionHandler.h"
namespace qpid {
@@ -45,7 +45,7 @@
};
ExecutionHandler l3;
- ChannelHandler l2;
+ SessionHandler l2;
const uint16_t id;
bool sync;
bool isClosed;
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp (from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp&r1=577297&r2=577459&rev=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp Wed Sep 19 15:34:11 2007
@@ -19,7 +19,7 @@
*
*/
-#include "ChannelHandler.h"
+#include "SessionHandler.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
@@ -27,14 +27,14 @@
using namespace qpid::framing;
using namespace boost;
-ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
+SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
-void ChannelHandler::incoming(AMQFrame& frame)
+void SessionHandler::incoming(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- ChannelCloseBody* closeBody=
- dynamic_cast<ChannelCloseBody*>(body->getMethod());
+ SessionClosedBody* closeBody=
+ dynamic_cast<SessionClosedBody*>(body->getMethod());
if (closeBody) {
setState(CLOSED_BY_PEER);
code = closeBody->getReplyCode();
@@ -46,12 +46,7 @@
try {
in(frame);
}catch(ChannelException& e){
- AMQMethodBody* method=body->getMethod();
- if (method)
- close(e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- else
- close(e.code, e.toString(), 0, 0);
+ closed(e.code, e.toString());
}
}
} else {
@@ -62,7 +57,7 @@
}
}
-void ChannelHandler::outgoing(AMQFrame& frame)
+void SessionHandler::outgoing(AMQFrame& frame)
{
if (getState() == OPEN) {
frame.setChannel(id);
@@ -74,12 +69,12 @@
}
}
-void ChannelHandler::open(uint16_t _id)
+void SessionHandler::open(uint16_t _id)
{
id = _id;
setState(OPENING);
- AMQFrame f(id, ChannelOpenBody(version));
+ AMQFrame f(id, SessionOpenBody(version));
out(f);
std::set<int> states;
@@ -91,37 +86,39 @@
}
}
-void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+void SessionHandler::close()
{
setState(CLOSING);
- AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
+ AMQFrame f(id, SessionCloseBody(version));
out(f);
+ waitFor(CLOSED);
}
-void ChannelHandler::close()
+void SessionHandler::closed(uint16_t code, const std::string& msg)
{
- close(200, "OK", 0, 0);
- waitFor(CLOSED);
+ setState(CLOSED);
+ AMQFrame f(id, SessionClosedBody(version, code, msg));
+ out(f);
}
-void ChannelHandler::handleMethod(AMQMethodBody* method)
+void SessionHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
case OPENING:
- if (method->isA<ChannelOpenOkBody>()) {
+ if (method->isA<SessionAttachedBody>()) {
setState(OPEN);
} else {
throw ConnectionException(504, "Channel not opened.");
}
break;
case CLOSING:
- if (method->isA<ChannelCloseOkBody>()) {
+ if (method->isA<SessionClosedBody>()) {
setState(CLOSED);
} //else just ignore it
break;
case CLOSED:
throw ConnectionException(504, "Channel is closed.");
default:
- throw Exception("Unexpected state encountered in ChannelHandler!");
+ throw Exception("Unexpected state encountered in SessionHandler!");
}
}
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h (from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h&r1=577297&r2=577459&rev=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h Wed Sep 19 15:34:11 2007
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _ChannelHandler_
-#define _ChannelHandler_
+#ifndef _SessionHandler_
+#define _SessionHandler_
#include "StateManager.h"
#include "ChainableFrameHandler.h"
@@ -28,7 +28,7 @@
namespace qpid {
namespace client {
-class ChannelHandler : private StateManager, public ChainableFrameHandler
+class SessionHandler : private StateManager, public ChainableFrameHandler
{
enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
framing::ProtocolVersion version;
@@ -38,21 +38,19 @@
std::string text;
void handleMethod(framing::AMQMethodBody* method);
-
- void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
-
+ void closed(uint16_t code, const std::string& msg);
public:
typedef boost::function<void(uint16_t, const std::string&)> CloseListener;
- ChannelHandler();
+ SessionHandler();
void incoming(framing::AMQFrame& frame);
void outgoing(framing::AMQFrame& frame);
void open(uint16_t id);
void close();
-
+
CloseListener onClose;
};