You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/21 18:40:49 UTC
svn commit: r629883 [1/2] - in /incubator/qpid/trunk/qpid:
cpp/rubygen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/
cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/ specs/
Author: gsim
Date: Thu Feb 21 09:40:42 2008
New Revision: 629883
URL: http://svn.apache.org/viewvc?rev=629883&view=rev
Log:
Start moving towards final 0-10 spec:
* marked preview spec as 99-0 to distinguish it from 0-10 (which will now be used for the final version)
* modified python client to treat 99-0 as 0-10 for now
* modified broker to have two paths for the two different versions: 99-0 uses PreviewConnection, PreviewConnectionHandler
and PreviewSessionHandler which are straight copy & pastes of the Connection, ConnectionHandler and SessionHandler now
associated with 0-10 (so we can migrate the 0-10 path to the final spec without affecting clients working with the preview
version)
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Thu Feb 21 09:40:42 2008
@@ -85,8 +85,8 @@
}
cpp_class(@classname, "public SessionBase") {
public
- genl "Session_0_10() {}"
- genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}"
+ genl "Session_#{@amqp.version.bars}() {}"
+ genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}"
session_methods.each { |m|
genl
doxygen(m)
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Feb 21 09:40:42 2008
@@ -163,6 +163,9 @@
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
+ qpid/broker/PreviewConnection.cpp \
+ qpid/broker/PreviewConnectionHandler.cpp \
+ qpid/broker/PreviewSessionHandler.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -186,6 +189,7 @@
qpid/broker/MessageDelivery.cpp \
qpid/broker/MessageHandlerImpl.cpp \
qpid/broker/MessageStoreModule.cpp \
+ qpid/broker/MultiVersionConnectionInputHandler.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/QueueBindings.cpp \
@@ -201,6 +205,7 @@
qpid/broker/SessionManager.h \
qpid/broker/SessionManager.cpp \
qpid/broker/SessionHandler.h \
+ qpid/broker/SessionContext.h \
qpid/broker/SessionHandler.cpp \
qpid/broker/SemanticHandler.cpp \
qpid/broker/Timer.cpp \
@@ -262,7 +267,11 @@
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
+ qpid/broker/PreviewConnection.h \
+ qpid/broker/PreviewConnectionHandler.h \
+ qpid/broker/PreviewSessionHandler.h \
qpid/broker/Connection.h \
+ qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
qpid/broker/ConnectionHandler.h \
qpid/broker/ConnectionToken.h \
@@ -293,6 +302,7 @@
qpid/broker/MessageHandlerImpl.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/MultiVersionConnectionInputHandler.h \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.h \
qpid/broker/Persistable.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Feb 21 09:40:42 2008
@@ -19,7 +19,7 @@
*
*/
#include "Bridge.h"
-#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
@@ -31,7 +31,7 @@
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, Connection& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
+Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
args(_args), channel(id, &(c.getOutput())), peer(channel),
mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
connection(c), listener(l)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Feb 21 09:40:42 2008
@@ -32,14 +32,14 @@
namespace qpid {
namespace broker {
-class Connection;
+class ConnectionState;
class Bridge : public management::Manageable
{
public:
typedef boost::function<void(Bridge*)> CancellationListener;
- Bridge(framing::ChannelId id, Connection& c, CancellationListener l,
+ Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
const management::ArgsLinkBridge& args);
~Bridge();
@@ -54,7 +54,7 @@
framing::ChannelHandler channel;
framing::AMQP_ServerProxy peer;
management::Bridge::shared_ptr mgmtObject;
- Connection& connection;
+ ConnectionState& connection;
CancellationListener listener;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Feb 21 09:40:42 2008
@@ -86,13 +86,7 @@
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
- broker(broker_),
- outputTasks(*out_),
- out(out_),
- framemax(65535),
- heartbeat(0),
- client(0),
- stagingThreshold(broker.getStagingThreshold()),
+ ConnectionState(out_, broker_),
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
@@ -226,17 +220,6 @@
}
return status;
-}
-
-void Connection::setUserId(const string& uid)
-{
- userId = uid;
- QPID_LOG (debug, "UserId is " << userId);
-}
-
-const string& Connection::getUserId() const
-{
- return userId;
}
Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Feb 21 09:40:42 2008
@@ -39,6 +39,7 @@
#include "qpid/sys/Socket.h"
#include "qpid/Exception.h"
#include "ConnectionHandler.h"
+#include "ConnectionState.h"
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
@@ -50,8 +51,7 @@
namespace broker {
class Connection : public sys::ConnectionInputHandler,
- public ConnectionToken,
- public management::Manageable
+ public ConnectionState
{
public:
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
@@ -63,25 +63,6 @@
/** Close the connection */
void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
- sys::ConnectionOutputHandler& getOutput() const { return *out; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint64_t getStagingThreshold() const { return stagingThreshold; }
-
- void setFrameMax(uint32_t fm) { framemax = fm; }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
- void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
-
- Broker& getBroker() { return broker; }
-
- Broker& broker;
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
- //contained output tasks
- sys::AggregateOutput outputTasks;
-
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
void initiated(const framing::ProtocolInitiation& header);
@@ -98,9 +79,6 @@
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- void setUserId(const string& uid);
- const string& getUserId() const;
-
void initMgmt(bool asLink = false);
private:
@@ -126,17 +104,11 @@
class MgmtClient;
class MgmtLink;
- framing::ProtocolVersion version;
ChannelMap channels;
- sys::ConnectionOutputHandler* out;
- uint32_t framemax;
- uint16_t heartbeat;
framing::AMQP_ClientProxy::Connection* client;
- uint64_t stagingThreshold;
ConnectionHandler adapter;
std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
- string userId;
const std::string mgmtId;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Thu Feb 21 09:40:42 2008
@@ -20,6 +20,7 @@
*/
#include "ConnectionFactory.h"
#include "Connection.h"
+#include "MultiVersionConnectionInputHandler.h"
namespace qpid {
namespace broker {
@@ -38,7 +39,7 @@
ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
const std::string& id)
{
- return new Connection(out, broker, id);
+ return new MultiVersionConnectionInputHandler(out, broker, id);
}
}} // namespace qpid::broker
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionState_
+#define _ConnectionState_
+
+#include <vector>
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/management/Manageable.h"
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class ConnectionState : public ConnectionToken, public management::Manageable
+{
+ public:
+ ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ broker(b),
+ outputTasks(*o),
+ out(o),
+ framemax(65535),
+ heartbeat(0),
+ stagingThreshold(broker.getStagingThreshold())
+ {}
+
+
+
+ virtual ~ConnectionState () {}
+
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint64_t getStagingThreshold() const { return stagingThreshold; }
+
+ void setFrameMax(uint32_t fm) { framemax = fm; }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+
+ void setUserId(const string& uid) { userId = uid; }
+ const string& getUserId() const { return userId; }
+
+ Broker& getBroker() { return broker; }
+
+ Broker& broker;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
+
+ sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ protected:
+ framing::ProtocolVersion version;
+ sys::ConnectionOutputHandler* out;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ uint64_t stagingThreshold;
+ string userId;
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Thu Feb 21 09:40:42 2008
@@ -21,6 +21,7 @@
#include "SemanticState.h"
#include "SessionState.h"
+#include "ConnectionState.h"
namespace qpid {
namespace broker {
@@ -39,7 +40,7 @@
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ ConnectionState& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getBroker(); }
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MultiVersionConnectionInputHandler.h"
+#include "Connection.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler(
+ qpid::sys::ConnectionOutputHandler* _out,
+ Broker& _broker,
+ const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {}
+
+
+void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i)
+{
+ if (i.getMajor() == 99 && i.getMinor() == 0) {
+ handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id));
+ } else if (i.getMajor() == 0 && i.getMinor() == 10) {
+ handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id));
+ } else {
+ throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString());
+ }
+ handler->initiated(i);
+}
+
+void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
+{
+ check();
+ handler->received(f);
+}
+
+void MultiVersionConnectionInputHandler::idleOut()
+{
+ check();
+ handler->idleOut();
+}
+
+void MultiVersionConnectionInputHandler::idleIn()
+{
+ check();
+ handler->idleIn();
+}
+
+bool MultiVersionConnectionInputHandler::doOutput()
+{
+ return check(false) && handler->doOutput();
+}
+
+qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
+{
+ return qpid::framing::ProtocolInitiation(linkVersion);
+}
+
+void MultiVersionConnectionInputHandler::closed()
+{
+ check();
+ handler->closed();
+}
+
+bool MultiVersionConnectionInputHandler::check(bool fail)
+{
+ if (!handler.get()) {
+ if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
+ else return false;
+ } else {
+ return true;
+ }
+}
+
+}
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MultiVersionConnectionInputHandler_
+#define _MultiVersionConnectionInputHandler_
+
+#include <memory>
+#include <string>
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHandler
+{
+ qpid::framing::ProtocolVersion linkVersion;//version used for inter-broker links
+ std::auto_ptr<qpid::sys::ConnectionInputHandler> handler;
+ qpid::sys::ConnectionOutputHandler* out;
+ Broker& broker;
+ const std::string id;
+
+ bool check(bool fail = true);
+
+public:
+ MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
+ virtual ~MultiVersionConnectionInputHandler() {}
+
+ void initiated(const qpid::framing::ProtocolInitiation&);
+ void received(qpid::framing::AMQFrame&);
+ void idleOut();
+ void idleIn();
+ bool doOutput();
+ qpid::framing::ProtocolInitiation getInitiation();
+ void closed();
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnection.h"
+#include "SessionState.h"
+#include "BrokerAdapter.h"
+#include "Bridge.h"
+#include "SemanticHandler.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/ptr_map.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <algorithm>
+#include <iostream>
+#include <assert.h>
+
+using namespace boost;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper
+{
+ management::Client::shared_ptr mgmtClient;
+
+public:
+ MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtClient();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+};
+
+class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
+{
+ typedef boost::ptr_vector<Bridge> Bridges;
+
+ management::Link::shared_ptr mgmtLink;
+ Bridges created;//holds list of bridges pending creation
+ Bridges cancelled;//holds list of bridges pending cancellation
+ Bridges active;//holds active bridges
+ uint channelCounter;
+ sys::Mutex lock;
+
+ void cancel(Bridge*);
+
+public:
+ MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtLink();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+ void processPending();
+ void process(PreviewConnection& connection, const management::Args& args);
+};
+
+
+PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
+ ConnectionState(out_, broker_),
+ adapter(*this),
+ mgmtClosing(0),
+ mgmtId(mgmtId_)
+{}
+
+void PreviewConnection::initMgmt(bool asLink)
+{
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ if (asLink) {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ } else {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ }
+ }
+ }
+}
+
+PreviewConnection::~PreviewConnection () {}
+
+void PreviewConnection::received(framing::AMQFrame& frame){
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+
+ if (frame.getChannel() == 0) {
+ adapter.handle(frame);
+ } else {
+ getChannel(frame.getChannel()).in(frame);
+ }
+
+ if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+}
+
+void PreviewConnection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ adapter.close(code, text, classId, methodId);
+ channels.clear();
+ getOutput().close();
+}
+
+void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
+ version = ProtocolVersion(header.getMajor(), header.getMinor());
+ adapter.init(header);
+ initMgmt();
+}
+
+void PreviewConnection::idleOut(){}
+
+void PreviewConnection::idleIn(){}
+
+void PreviewConnection::closed(){ // Physically closed, suspend open sessions.
+ try {
+ for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
+ get_pointer(i)->localSuspend();
+ while (!exclusiveQueues.empty()) {
+ Queue::shared_ptr q(exclusiveQueues.front());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete()) {
+ Queue::tryAutoDelete(broker, q);
+ }
+ exclusiveQueues.erase(exclusiveQueues.begin());
+ }
+ } catch(std::exception& e) {
+ QPID_LOG(error, " Unhandled exception while closing session: " <<
+ e.what());
+ assert(0);
+ }
+}
+
+bool PreviewConnection::doOutput()
+{
+ try{
+ //process any pending mgmt commands:
+ if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+ //then do other output as needed:
+ return outputTasks.doOutput();
+ }catch(ConnectionException& e){
+ close(e.code, e.what(), 0, 0);
+ }catch(std::exception& e){
+ close(541/*internal error*/, e.what(), 0, 0);
+ }
+ return false;
+}
+
+void PreviewConnection::closeChannel(uint16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end()) channels.erase(i);
+}
+
+PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) {
+ ChannelMap::iterator i=channels.find(id);
+ if (i == channels.end()) {
+ i = channels.insert(id, new PreviewSessionHandler(*this, id)).first;
+ }
+ return *get_pointer(i);
+}
+
+ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const
+{
+ return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+}
+
+Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
+ Args& args)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Client::METHOD_CLOSE :
+ mgmtClosing = 1;
+ if (mgmtWrapper.get()) mgmtWrapper->closing();
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Link::METHOD_BRIDGE :
+ //queue this up and request chance to do output (i.e. get connections thread of control):
+ mgmtWrapper->process(*this, args);
+ out->activateOutput();
+ status = Manageable::STATUS_OK;
+ break;
+ }
+
+ return status;
+}
+
+PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+ : channelCounter(1)
+{
+ mgmtLink = management::Link::shared_ptr
+ (new management::Link(conn, parent, mgmtId));
+ agent->addObject (mgmtLink);
+}
+
+PreviewConnection::MgmtLink::~MgmtLink()
+{
+ if (mgmtLink.get () != 0)
+ mgmtLink->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame)
+{
+ if (mgmtLink.get () != 0)
+ {
+ mgmtLink->inc_framesFromPeer ();
+ mgmtLink->inc_bytesFromPeer (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void PreviewConnection::MgmtLink::closing()
+{
+ if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void PreviewConnection::MgmtLink::processPending()
+{
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create();
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
+{
+ created.push_back(new Bridge(channelCounter++, connection,
+ boost::bind(&MgmtLink::cancel, this, _1),
+ dynamic_cast<const management::ArgsLinkBridge&>(args)));
+}
+
+void PreviewConnection::MgmtLink::cancel(Bridge* b)
+{
+ //need to take this out the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == b) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+}
+
+PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+ mgmtClient = management::Client::shared_ptr
+ (new management::Client (conn, parent, mgmtId));
+ agent->addObject (mgmtClient);
+}
+
+PreviewConnection::MgmtClient::~MgmtClient()
+{
+ if (mgmtClient.get () != 0)
+ mgmtClient->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame)
+{
+ if (mgmtClient.get () != 0)
+ {
+ mgmtClient->inc_framesFromClient ();
+ mgmtClient->inc_bytesFromClient (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void PreviewConnection::MgmtClient::closing()
+{
+ if (mgmtClient) mgmtClient->set_closing (1);
+}
+
+}}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnection_
+#define _PreviewConnection_
+
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "Broker.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/Exception.h"
+#include "PreviewConnectionHandler.h"
+#include "ConnectionState.h"
+#include "PreviewSessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection : public sys::ConnectionInputHandler,
+ public ConnectionState
+{
+ public:
+ PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
+ ~PreviewConnection ();
+
+ /** Get the PreviewSessionHandler for channel. Create if it does not already exist */
+ PreviewSessionHandler& getChannel(framing::ChannelId channel);
+
+ /** Close the connection */
+ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame& frame);
+ void initiated(const framing::ProtocolInitiation& header);
+ void idleOut();
+ void idleIn();
+ void closed();
+ bool doOutput();
+ framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
+
+ void closeChannel(framing::ChannelId channel);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
+ void initMgmt(bool asLink = false);
+
+ private:
+ typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ /**
+ * Connection may appear, for the purposes of management, as a
+ * normal client initiated connection or as an agent initiated
+ * inter-broker link. This wrapper abstracts the common interface
+ * for both.
+ */
+ class MgmtWrapper
+ {
+ public:
+ virtual ~MgmtWrapper(){}
+ virtual void received(framing::AMQFrame& frame) = 0;
+ virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
+ virtual void closing() = 0;
+ virtual void processPending(){}
+ virtual void process(PreviewConnection&, const management::Args&){}
+ };
+ class MgmtClient;
+ class MgmtLink;
+
+ ChannelMap channels;
+ framing::AMQP_ClientProxy::Connection* client;
+ uint64_t stagingThreshold;
+ PreviewConnectionHandler adapter;
+ std::auto_ptr<MgmtWrapper> mgmtWrapper;
+ bool mgmtClosing;
+ const std::string mgmtId;
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,158 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewConnectionHandler.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+
+namespace
+{
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
+}
+
+void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+}
+
+void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ handler->client.close(code, text, classId, methodId);
+}
+
+void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
+{
+ AMQMethodBody* method=frame.getBody()->getMethod();
+ try{
+ if (handler->serverMode) {
+ if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ } else {
+ if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ }
+ }catch(ConnectionException& e){
+ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {}
+
+PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()),
+ connection(c), serverMode(false) {}
+
+void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+ const string& mechanism,
+ const string& response, const string& /*locale*/)
+{
+ //TODO: handle SASL mechanisms more cleverly
+ if (mechanism == PLAIN) {
+ if (response.size() > 0 && response[0] == (char) 0) {
+ string temp = response.substr(1);
+ string::size_type i = temp.find((char)0);
+ string uid = temp.substr(0, i);
+ string pwd = temp.substr(i + 1);
+ //TODO: authentication
+ connection.setUserId(uid);
+ }
+ }
+ client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
+}
+
+void PreviewConnectionHandler::Handler::secureOk(const string& /*response*/){}
+
+void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
+ uint32_t framemax, uint16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
+}
+
+void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
+ string knownhosts;
+ client.openOk(knownhosts);
+}
+
+
+void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk();
+ connection.getOutput().close();
+}
+
+void PreviewConnectionHandler::Handler::closeOk(){
+ connection.getOutput().close();
+}
+
+
+void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+ uint8_t /*versionMinor*/,
+ const FieldTable& /*serverProperties*/,
+ const string& /*mechanisms*/,
+ const string& /*locales*/)
+{
+ string uid = "qpidd";
+ string pwd = "qpidd";
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ server.startOk(FieldTable(), PLAIN, response, en_US);
+ connection.initMgmt(true);
+}
+
+void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+ server.secureOk("");
+}
+
+void PreviewConnectionHandler::Handler::tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat)
+{
+ connection.setFrameMax(frameMax);
+ connection.setHeartbeat(heartbeat);
+ server.tuneOk(channelMax, frameMax, heartbeat);
+ server.open("/", "", true);
+}
+
+void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
+{
+
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnectionAdapter_
+#define _PreviewConnectionAdapter_
+
+#include <memory>
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+
+// TODO aconway 2007-09-18: Rename to ConnectionHandler
+class PreviewConnectionHandler : public framing::FrameHandler
+{
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
+ {
+ framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
+ PreviewConnection& connection;
+ bool serverMode;
+
+ Handler(PreviewConnection& connection);
+ void startOk(const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const std::string& response);
+ void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void open(const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(uint16_t replyCode, const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk();
+
+
+ void start(uint8_t versionMajor,
+ uint8_t versionMinor,
+ const qpid::framing::FieldTable& serverProperties,
+ const std::string& mechanisms,
+ const std::string& locales);
+
+ void secure(const std::string& challenge);
+
+ void tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat);
+
+ void openOk(const std::string& knownHosts);
+
+ void redirect(const std::string& host, const std::string& knownHosts);
+ };
+ std::auto_ptr<Handler> handler;
+ public:
+ PreviewConnectionHandler(PreviewConnection& connection);
+ void init(const framing::ProtocolInitiation& header);
+ void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
+ void handle(framing::AMQFrame& frame);
+};
+
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionHandler.h"
+#include "SessionState.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+using namespace framing;
+using namespace std;
+using namespace qpid::sys;
+
+PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
+ : SessionContext(c.getOutput()),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
+ ignoring(false) {}
+
+PreviewSessionHandler::~PreviewSessionHandler() {}
+
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
+
+void PreviewSessionHandler::handleIn(AMQFrame& f) {
+ // Note on channel states: a channel is open if session != 0. A
+ // channel that is closed (session == 0) can be in the "ignoring"
+ // state. This is a temporary state after we have sent a channel
+ // exception, where extra frames might arrive that should be
+ // ignored.
+ //
+ AMQMethodBody* m = f.getBody()->getMethod();
+ try {
+ if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
+ } catch(const ChannelException& e) {
+ ignoring=true; // Ignore trailing frames sent by client.
+ session->detach();
+ session.reset();
+ peerSession.closed(e.code, e.what());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.what(), classId(m), methodId(m));
+ }catch(const std::exception& e){
+ connection.close(
+ framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+ }
+}
+
+void PreviewSessionHandler::handleOut(AMQFrame& f) {
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
+}
+
+void PreviewSessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
+ throw ChannelErrorException(
+ QPID_MSG(method << " failed: No session for channel "
+ << getChannel()));
+}
+
+void PreviewSessionHandler::assertClosed(const char* method) const {
+ if (session.get())
+ throw ChannelBusyException(
+ QPID_MSG(method << " failed: channel " << channel.get()
+ << " is already open."));
+}
+
+void PreviewSessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+ peerSession.attached(session->getId(), session->getTimeout());
+}
+
+void PreviewSessionHandler::resume(const Uuid& id) {
+ assertClosed("resume");
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
+}
+
+void PreviewSessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
+ // TODO aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void PreviewSessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
+ // TODO aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException("session.flowOk");
+}
+
+void PreviewSessionHandler::close() {
+ assertAttached("close");
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session->detach();
+ session.reset();
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
+}
+
+void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session->detach();
+ session.reset();
+}
+
+void PreviewSessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
+}
+
+void PreviewSessionHandler::suspend() {
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
+}
+
+void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&PreviewSessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
+}
+
+void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ // TODO aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
+}
+
+void PreviewSessionHandler::solicitAck() {
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void PreviewSessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+}
+
+ConnectionState& PreviewSessionHandler::getConnection() { return connection; }
+const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; }
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,111 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H
+#define QPID_BROKER_PREVIEWSESSIONHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+class SessionState;
+
+/**
+ * A SessionHandler is associated with each active channel. It
+ * receives incoming frames, handles session commands and manages the
+ * association between the channel and a session.
+ */
+class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
+ public framing::AMQP_ClientOperations::SessionHandler,
+ public SessionContext,
+ private boost::noncopyable
+{
+ public:
+ PreviewSessionHandler(PreviewConnection&, framing::ChannelId);
+ ~PreviewSessionHandler();
+
+ /** Returns 0 if not attached to a session */
+ SessionState* getSession() { return session.get(); }
+ const SessionState* getSession() const { return session.get(); }
+
+ framing::ChannelId getChannel() const { return channel.get(); }
+
+ ConnectionState& getConnection();
+ const ConnectionState& getConnection() const;
+
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+ const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+
+ // Called by closing connection.
+ void localSuspend();
+ void detach() { localSuspend(); }
+
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
+
+ private:
+ /// Session methods
+ void open(uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void close();
+ void closed(uint16_t replyCode, const std::string& replyText);
+ void resume(const framing::Uuid& sessionId);
+ void suspend();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+
+ //extra methods required for assuming client role
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void detached();
+
+
+ void assertAttached(const char* method) const;
+ void assertActive(const char* method) const;
+ void assertClosed(const char* method) const;
+
+
+ PreviewConnection& connection;
+ framing::ChannelHandler channel;
+ framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Session peerSession;
+ bool ignoring;
+ std::auto_ptr<SessionState> session;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSIONHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Feb 21 09:40:42 2008
@@ -21,11 +21,10 @@
#include "SemanticHandler.h"
#include "SemanticState.h"
-#include "SessionHandler.h"
+#include "SessionContext.h"
#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "Connection.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
#include "qpid/framing/ServerInvoker.h"
@@ -165,7 +164,7 @@
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- SessionHandler* handler = session.getHandler();
+ SessionContext* handler = session.getHandler();
if (handler) {
uint32_t maxFrameSize = handler->getConnection().getFrameMax();
MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Thu Feb 21 09:40:42 2008
@@ -77,7 +77,7 @@
DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ //Connection& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getBroker(); }
public:
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_SESSIONCONTEXT_H
+#define QPID_BROKER_SESSIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "ConnectionState.h"
+
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SessionContext : public framing::FrameHandler::InOutHandler
+{
+ public:
+ SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
+ virtual ~SessionContext(){}
+ virtual ConnectionState& getConnection() = 0;
+ virtual const ConnectionState& getConnection() const = 0;
+ virtual framing::AMQP_ClientProxy& getProxy() = 0;
+ virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
+ virtual void detach() = 0;
+ virtual framing::ChannelId getChannel() const = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSIONCONTEXT_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -36,7 +36,7 @@
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : InOutHandler(0, &c.getOutput()),
+ : SessionContext(c.getOutput()),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -203,5 +203,9 @@
connection.broker.getSessionManager().suspend(session);
session.reset();
}
+
+
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Thu Feb 21 09:40:42 2008
@@ -28,6 +28,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
#include <boost/noncopyable.hpp>
@@ -42,9 +43,9 @@
* receives incoming frames, handles session commands and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::FrameHandler::InOutHandler,
- public framing::AMQP_ServerOperations::SessionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
+ public SessionContext,
private boost::noncopyable
{
public:
@@ -57,14 +58,15 @@
framing::ChannelId getChannel() const { return channel.get(); }
- Connection& getConnection() { return connection; }
- const Connection& getConnection() const { return connection; }
+ ConnectionState& getConnection();
+ const ConnectionState& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
// Called by closing connection.
void localSuspend();
+ void detach() { localSuspend(); }
protected:
void handleIn(framing::AMQFrame&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Thu Feb 21 09:40:42 2008
@@ -45,7 +45,7 @@
// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
- SessionHandler& h, uint32_t timeout_)
+ SessionContext& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Thu Feb 21 09:40:42 2008
@@ -38,7 +38,7 @@
namespace broker {
class SessionState;
-class SessionHandler;
+class SessionContext;
/**
* Create and manage SessionState objects.
@@ -57,7 +57,7 @@
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Feb 21 09:40:42 2008
@@ -20,8 +20,8 @@
*/
#include "SessionState.h"
#include "SessionManager.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SessionContext.h"
+#include "ConnectionState.h"
#include "Broker.h"
#include "SemanticHandler.h"
#include "qpid/framing/reply_exceptions.h"
@@ -37,7 +37,7 @@
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
@@ -76,7 +76,7 @@
mgmtObject->resourceDestroy ();
}
-SessionHandler* SessionState::getHandler() {
+SessionContext* SessionState::getHandler() {
return handler;
}
@@ -85,7 +85,7 @@
return getHandler()->getProxy();
}
-Connection& SessionState::getConnection() {
+ConnectionState& SessionState::getConnection() {
assert(isAttached());
return getHandler()->getConnection();
}
@@ -100,7 +100,7 @@
}
}
-void SessionState::attach(SessionHandler& h) {
+void SessionState::attach(SessionContext& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
@@ -141,7 +141,7 @@
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->localSuspend ();
+ handler->detach();
}
status = Manageable::STATUS_OK;
break;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Feb 21 09:40:42 2008
@@ -48,10 +48,10 @@
namespace broker {
class SemanticHandler;
-class SessionHandler;
+class SessionContext;
class SessionManager;
class Broker;
-class Connection;
+class ConnectionState;
/**
* Broker-side session state includes sessions handler chains, which may
@@ -67,16 +67,16 @@
bool isAttached() { return handler; }
void detach();
- void attach(SessionHandler& handler);
+ void attach(SessionContext& handler);
- SessionHandler* getHandler();
+ SessionContext* getHandler();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
- Connection& getConnection();
+ ConnectionState& getConnection();
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
@@ -92,14 +92,14 @@
// Normally SessionManager creates sessions.
SessionState(SessionManager*,
- SessionHandler* out,
+ SessionContext* out,
uint32_t timeout,
uint32_t ackInterval);
private:
SessionManager* factory;
- SessionHandler* handler;
+ SessionContext* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Thu Feb 21 09:40:42 2008
@@ -46,9 +46,9 @@
class ScopedSync
{
- Session_0_10& session;
+ Session& session;
public:
- ScopedSync(Session_0_10& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
+ ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
~ScopedSync() { session.setSynchronous(false); }
};
@@ -63,7 +63,7 @@
join();
}
-void Channel::open(const Session_0_10& s)
+void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h Thu Feb 21 09:40:42 2008
@@ -29,7 +29,7 @@
#include "Message.h"
#include "Queue.h"
#include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/Exception.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -79,7 +79,7 @@
bool running;
ConsumerMap consumers;
- Session_0_10 session;
+ Session session;
framing::ChannelId channelId;
sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
@@ -88,7 +88,7 @@
void stop();
- void open(const Session_0_10& session);
+ void open(const Session& session);
void closeInternal();
void join();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Thu Feb 21 09:40:42 2008
@@ -73,7 +73,7 @@
channel.open(newSession(ASYNC));
}
-Session_0_10 Connection::newSession(SynchronousMode sync,
+Session Connection::newSession(SynchronousMode sync,
uint32_t detachedLifetime)
{
shared_ptr<SessionCore> core(
@@ -81,10 +81,10 @@
core->setSync(sync);
impl->addSession(core);
core->open(detachedLifetime);
- return Session_0_10(core);
+ return Session(core);
}
-void Connection::resume(Session_0_10& session) {
+void Connection::resume(Session& session) {
session.impl->setChannel(++channelIdCounter);
impl->addSession(session.impl);
session.impl->resume(impl);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Feb 21 09:40:42 2008
@@ -25,7 +25,7 @@
#include <string>
#include "Channel.h"
#include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/Uuid.h"
@@ -134,13 +134,13 @@
* that the broker may discard the session state. Default is 0,
* meaning the session cannot be resumed.
*/
- Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
+ Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
/**
* Resume a suspendded session. A session may be resumed
* on a different connection to the one that created it.
*/
- void resume(Session_0_10& session);
+ void resume(Session& session);
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Feb 21 09:40:42 2008
@@ -20,7 +20,6 @@
*/
#include "Dispatcher.h"
-#include "qpid/client/Session_0_10.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -38,7 +37,7 @@
namespace qpid {
namespace client {
-Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
+Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
@@ -48,7 +47,7 @@
}
}
-Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+Dispatcher::Dispatcher(Session& s, const std::string& q)
: session(s), running(false), autoStop(true)
{
queue = q.empty() ?
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Thu Feb 21 09:40:42 2008
@@ -25,6 +25,7 @@
#include <memory>
#include <string>
#include <boost/shared_ptr.hpp>
+#include "qpid/client/Session.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -34,17 +35,15 @@
namespace qpid {
namespace client {
-class Session_0_10;
-
class Subscriber : public MessageListener
{
- Session_0_10& session;
+ Session& session;
MessageListener* const listener;
AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
+ Subscriber(Session& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -56,7 +55,7 @@
typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
sys::Mutex lock;
sys::Thread worker;
- Session_0_10& session;
+ Session& session;
Demux::QueuePtr queue;
bool running;
bool autoStop;
@@ -68,7 +67,7 @@
bool isStopped();
public:
- Dispatcher(Session_0_10& session, const std::string& queue = "");
+ Dispatcher(Session& session, const std::string& queue = "");
void start();
void run();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Thu Feb 21 09:40:42 2008
@@ -50,7 +50,7 @@
private:
friend class SubscriptionManager;
- Session_0_10 session;
+ Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Thu Feb 21 09:40:42 2008
@@ -22,7 +22,7 @@
*
*/
#include <string>
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/TransferContent.h"
@@ -63,18 +63,18 @@
return getMessageProperties().getApplicationHeaders();
}
- void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
+ void acknowledge(Session& session, bool cumulative = true, bool send = true) const
{
session.getExecution().completed(id, cumulative, send);
}
void acknowledge(bool cumulative = true, bool send = true) const
{
- const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
+ const_cast<Session&>(session).getExecution().completed(id, cumulative, send);
}
/**@internal for incoming messages */
- Message(const framing::FrameSet& frameset, Session_0_10 s) :
+ Message(const framing::FrameSet& frameset, Session s) :
method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
{
populate(frameset);
@@ -91,12 +91,12 @@
}
/**@internal use for incoming messages. */
- void setSession(Session_0_10 s) { session=s; }
+ void setSession(Session s) { session=s; }
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
framing::SequenceNumber id;
- Session_0_10 session;
+ Session session;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h Thu Feb 21 09:40:42 2008
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session_99_0.h"
namespace qpid {
namespace client {
@@ -31,7 +31,7 @@
*
* \ingroup clientapi
*/
-typedef Session_0_10 Session;
+typedef Session_99_0 Session;
}} // namespace qpid::client