You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/25 17:56:41 UTC
svn commit: r630934 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/cluster/ qpid/framing/ tests/
Author: gsim
Date: Mon Feb 25 08:56:29 2008
New Revision: 630934
URL: http://svn.apache.org/viewvc?rev=630934&view=rev
Log:
Some refactoring of the 0-10 codepath (being migrated to final spec) that primarily colocates the current session and execution layers to facilitate implementing the new session layer that will now encompass this behaviour.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Feb 25 08:56:29 2008
@@ -166,6 +166,8 @@
qpid/broker/PreviewConnection.cpp \
qpid/broker/PreviewConnectionHandler.cpp \
qpid/broker/PreviewSessionHandler.cpp \
+ qpid/broker/PreviewSessionManager.cpp \
+ qpid/broker/PreviewSessionState.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -270,6 +272,8 @@
qpid/broker/PreviewConnection.h \
qpid/broker/PreviewConnectionHandler.h \
qpid/broker/PreviewSessionHandler.h \
+ qpid/broker/PreviewSessionManager.h \
+ qpid/broker/PreviewSessionState.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 25 08:56:29 2008
@@ -109,7 +109,8 @@
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
- sessionManager(conf.ack)
+ sessionManager(conf.ack),
+ previewSessionManager(conf.ack)
{
// Early-Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 25 08:56:29 2008
@@ -30,6 +30,7 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
+#include "PreviewSessionManager.h"
#include "Vhost.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -109,6 +110,7 @@
DataDir& getDataDir() { return dataDir; }
SessionManager& getSessionManager() { return sessionManager; }
+ PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -136,6 +138,7 @@
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
+ PreviewSessionManager previewSessionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Mon Feb 25 08:56:29 2008
@@ -68,7 +68,7 @@
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- framing::ProtocolVersion getVersion() const { return session.getVersion();}
+ framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
AccessHandler* getAccessHandler() {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Mon Feb 25 08:56:29 2008
@@ -20,7 +20,7 @@
*/
#include "SemanticState.h"
-#include "SessionState.h"
+#include "SessionContext.h"
#include "ConnectionState.h"
namespace qpid {
@@ -35,13 +35,13 @@
class HandlerImpl {
protected:
SemanticState& state;
- SessionState& session;
+ SessionContext& session;
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
ConnectionState& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp Mon Feb 25 08:56:29 2008
@@ -19,7 +19,7 @@
*/
#include "PreviewSessionHandler.h"
-#include "SessionState.h"
+#include "PreviewSessionState.h"
#include "PreviewConnection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
@@ -36,7 +36,7 @@
using namespace qpid::sys;
PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -106,15 +106,15 @@
void PreviewSessionHandler::open(uint32_t detachedLifetime) {
assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
peerSession.attached(session->getId(), session->getTimeout());
}
void PreviewSessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
+ session = connection.broker.getPreviewSessionManager().resume(id);
session->attach(*this);
SequenceNumber seq = session->resuming();
peerSession.attached(session->getId(), session->getTimeout());
@@ -154,7 +154,7 @@
void PreviewSessionHandler::localSuspend() {
if (session.get() && session->isAttached()) {
session->detach();
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
}
@@ -171,7 +171,7 @@
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
+ if (session->getState() == PreviewSessionState::RESUMING) {
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
@@ -193,14 +193,14 @@
void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
}
void PreviewSessionHandler::detached()
{
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h Mon Feb 25 08:56:29 2008
@@ -36,7 +36,7 @@
namespace broker {
class PreviewConnection;
-class SessionState;
+class PreviewSessionState;
/**
* A SessionHandler is associated with each active channel. It
@@ -45,7 +45,7 @@
*/
class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -53,8 +53,8 @@
~PreviewSessionHandler();
/** Returns 0 if not attached to a session */
- SessionState* getSession() { return session.get(); }
- const SessionState* getSession() const { return session.get(); }
+ PreviewSessionState* getSession() { return session.get(); }
+ const PreviewSessionState* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel.get(); }
@@ -101,7 +101,7 @@
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
- std::auto_ptr<SessionState> session;
+ std::auto_ptr<PreviewSessionState> session;
};
}} // namespace qpid::broker
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp Mon Feb 25 08:56:29 2008
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionManager.h"
+#include "PreviewSessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/memory.h"
+
+#include <boost/bind.hpp>
+#include <boost/range.hpp>
+
+#include <algorithm>
+#include <functional>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+using namespace sys;
+using namespace framing;
+
+PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
+
+PreviewSessionManager::~PreviewSessionManager() {}
+
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::open(
+ PreviewSessionHandler& h, uint32_t timeout_)
+{
+ Mutex::ScopedLock l(lock);
+ std::auto_ptr<PreviewSessionState> session(
+ new PreviewSessionState(this, &h, timeout_, ack));
+ active.insert(session->getId());
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::opened, _1,boost::ref(*session)));
+ return session;
+}
+
+void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
+ Mutex::ScopedLock l(lock);
+ active.erase(session->getId());
+ session->suspend();
+ session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ if (session->mgmtObject.get() != 0)
+ session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
+ suspended.push_back(session.release()); // In expiry order
+ eraseExpired();
+}
+
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ eraseExpired();
+ if (active.find(id) != active.end())
+ throw SessionBusyException(
+ QPID_MSG("Session already active: " << id));
+ Suspended::iterator i = std::find_if(
+ suspended.begin(), suspended.end(),
+ boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
+ );
+ if (i == suspended.end())
+ throw InvalidArgumentException(
+ QPID_MSG("No suspended session with id=" << id));
+ active.insert(id);
+ std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
+ return state;
+}
+
+void PreviewSessionManager::erase(const framing::Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ active.erase(id);
+}
+
+void PreviewSessionManager::eraseExpired() {
+ // Called with lock held.
+ if (!suspended.empty()) {
+ Suspended::iterator keep = std::lower_bound(
+ suspended.begin(), suspended.end(), now(),
+ boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
+ }
+}
+
+void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) {
+ observers.push_back(o);
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h Mon Feb 25 08:56:29 2008
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
+#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/Uuid.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <set>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+class PreviewSessionState;
+class PreviewSessionHandler;
+
+/**
+ * Create and manage PreviewSessionState objects.
+ */
+class PreviewSessionManager : private boost::noncopyable {
+ public:
+ /**
+ * Observer notified of PreviewSessionManager events.
+ */
+ struct Observer : public RefCounted {
+ virtual void opened(PreviewSessionState&) {}
+ };
+
+ PreviewSessionManager(uint32_t ack);
+
+ ~PreviewSessionManager();
+
+ /** Open a new active session, caller takes ownership */
+ std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
+
+ /** Suspend a session, start it's timeout counter.
+ * The factory takes ownership.
+ */
+ void suspend(std::auto_ptr<PreviewSessionState> session);
+
+ /** Resume a suspended session.
+ *@throw Exception if timed out or non-existant.
+ */
+ std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
+
+ /** Add an Observer. */
+ void add(const intrusive_ptr<Observer>&);
+
+ private:
+ typedef boost::ptr_vector<PreviewSessionState> Suspended;
+ typedef std::set<framing::Uuid> Active;
+ typedef std::vector<intrusive_ptr<Observer> > Observers;
+
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
+ sys::Mutex lock;
+ Suspended suspended;
+ Active active;
+ uint32_t ack;
+ Observers observers;
+
+ friend class PreviewSessionState; // removes deleted sessions from active set.
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+
+
+#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp Mon Feb 25 08:56:29 2008
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewSessionState.h"
+#include "PreviewSessionManager.h"
+#include "PreviewSessionHandler.h"
+#include "ConnectionState.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+PreviewSessionState::PreviewSessionState(
+ PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack, timeout_ > 0),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
+ getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Session::shared_ptr
+ (new management::Session (this, parent, id.str ()));
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
+ mgmtObject->set_detachedLifespan (getTimeout());
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+PreviewSessionState::~PreviewSessionState() {
+ // Remove ID from active session list.
+ if (factory)
+ factory->erase(getId());
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+PreviewSessionHandler* PreviewSessionState::getHandler() {
+ return handler;
+}
+
+AMQP_ClientProxy& PreviewSessionState::getProxy() {
+ assert(isAttached());
+ return getHandler()->getProxy();
+}
+
+ConnectionState& PreviewSessionState::getConnection() {
+ assert(isAttached());
+ return getHandler()->getConnection();
+}
+
+void PreviewSessionState::detach() {
+ getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ Mutex::ScopedLock l(lock);
+ handler = 0; out.next = 0;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (0);
+ }
+}
+
+void PreviewSessionState::attach(PreviewSessionHandler& h) {
+ {
+ Mutex::ScopedLock l(lock);
+ handler = &h;
+ out.next = &handler->out;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
+ }
+ h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
+
+void PreviewSessionState::activateOutput()
+{
+ Mutex::ScopedLock l(lock);
+ if (isAttached()) {
+ getConnection().outputTasks.activateOutput();
+ }
+}
+ //This class could be used as the callback for queue notifications
+ //if not attached, it can simply ignore the callback, else pass it
+ //on to the connection
+
+ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case management::Session::METHOD_DETACH :
+ if (handler != 0)
+ {
+ handler->detach();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_CLOSE :
+ /*
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+ */
+
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h Mon Feb 25 08:56:29 2008
@@ -0,0 +1,124 @@
+#ifndef QPID_BROKER_PREVIEWSESSION_H
+#define QPID_BROKER_PREVIEWSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <set>
+#include <vector>
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class SemanticHandler;
+class PreviewSessionHandler;
+class PreviewSessionManager;
+class Broker;
+class ConnectionState;
+
+/**
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
+ */
+class PreviewSessionState : public framing::SessionState,
+ public SessionContext,
+ public framing::FrameHandler::Chains,
+ public management::Manageable
+{
+ public:
+ ~PreviewSessionState();
+ bool isAttached() { return handler; }
+
+ void detach();
+ void attach(PreviewSessionHandler& handler);
+
+
+ PreviewSessionHandler* getHandler();
+
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ ConnectionState& getConnection();
+
+ uint32_t getTimeout() const { return timeout; }
+ Broker& getBroker() { return broker; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ /** OutputControl **/
+ void activateOutput();
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
+ // Normally SessionManager creates sessions.
+ PreviewSessionState(PreviewSessionManager*,
+ PreviewSessionHandler* out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
+
+ private:
+ PreviewSessionManager* factory;
+ PreviewSessionHandler* handler;
+ framing::Uuid id;
+ uint32_t timeout;
+ sys::AbsTime expiry; // Used by SessionManager.
+ Broker& broker;
+ framing::ProtocolVersion version;
+ sys::Mutex lock;
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
+ management::Session::shared_ptr mgmtObject;
+
+ friend class PreviewSessionManager;
+};
+
+
+inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
+ return out << session.getId();
+}
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Mon Feb 25 08:56:29 2008
@@ -22,7 +22,6 @@
#include "SemanticHandler.h"
#include "SemanticState.h"
#include "SessionContext.h"
-#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -37,9 +36,9 @@
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(SessionState& s) :
+SemanticHandler::SemanticHandler(SessionContext& s) :
state(*this,s), session(s),
- msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
+ msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
{}
@@ -164,13 +163,8 @@
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- SessionContext* handler = session.getHandler();
- if (handler) {
- uint32_t maxFrameSize = handler->getConnection().getFrameMax();
- MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
- } else {
- QPID_LOG(error, "Dropping message as session is no longer attached to a channel.");
- }
+ uint32_t maxFrameSize = session.getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
return outgoing.hwm;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Mon Feb 25 08:56:29 2008
@@ -46,7 +46,7 @@
namespace broker {
-class SessionState;
+class SessionContext;
class SemanticHandler : public DeliveryAdapter,
public framing::FrameHandler,
@@ -56,7 +56,7 @@
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
SemanticState state;
- SessionState& session;
+ SessionContext& session;
// TODO aconway 2007-09-20: Why are these on the handler rather than the
// state?
IncomingExecutionContext incoming;
@@ -78,10 +78,10 @@
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
//Connection& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
public:
- SemanticHandler(SessionState& session);
+ SemanticHandler(SessionContext& session);
//frame handler:
void handle(framing::AMQFrame& frame);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 25 08:56:29 2008
@@ -19,7 +19,7 @@
*
*/
-#include "SessionState.h"
+#include "SessionContext.h"
#include "BrokerAdapter.h"
#include "Queue.h"
#include "Connection.h"
@@ -56,7 +56,7 @@
using namespace qpid::sys;
using namespace qpid::ptr_map;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
deliveryAdapter(da),
prefetchSize(0),
@@ -263,21 +263,16 @@
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (parent->getSession().isAttached() && accept(msg.payload)) {
- allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
- return true;
- } else {
- QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
- return false;
+ allocateCredit(msg.payload);
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter.deliver(msg, token);
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg.payload);
}
+ return true;
}
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
@@ -331,7 +326,7 @@
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(getSession().getBroker(), queue);
+ Queue::tryAutoDelete(getSession().getConnection().getBroker(), queue);
}
}
}
@@ -584,7 +579,7 @@
if (name.empty()) {
throw NotAllowedException(QPID_MSG("No queue name specified."));
} else {
- queue = session.getBroker().getQueues().find(name);
+ queue = session.getConnection().getBroker().getQueues().find(name);
if (!queue)
throw NotFoundException(QPID_MSG("Queue not found: "<<name));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Feb 25 08:56:29 2008
@@ -45,7 +45,7 @@
namespace qpid {
namespace broker {
-class SessionState;
+class SessionContext;
/**
* SemanticState holds the L3 and L4 state of an open session, whether
@@ -98,7 +98,7 @@
typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
- SessionState& session;
+ SessionContext& session;
DeliveryAdapter& deliveryAdapter;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
@@ -129,10 +129,10 @@
void cancel(ConsumerImpl&);
public:
- SemanticState(DeliveryAdapter&, SessionState&);
+ SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
- SessionState& getSession() { return session; }
+ SessionContext& getSession() { return session; }
/**
* Get named queue, never returns 0.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Feb 25 08:56:29 2008
@@ -25,6 +25,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
@@ -33,17 +34,12 @@
namespace qpid {
namespace broker {
-class SessionContext : public framing::FrameHandler::InOutHandler
+class SessionContext : public sys::OutputControl
{
public:
- SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
virtual ~SessionContext(){}
virtual ConnectionState& getConnection() = 0;
- virtual const ConnectionState& getConnection() const = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
- virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
- virtual void detach() = 0;
- virtual framing::ChannelId getChannel() const = 0;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon Feb 25 08:56:29 2008
@@ -21,6 +21,7 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
@@ -36,7 +37,7 @@
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -58,18 +59,22 @@
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
+ if (!ignoring) {
+ if (m &&
+ (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
+ invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
}
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
@@ -91,10 +96,12 @@
}
void SessionHandler::assertAttached(const char* method) const {
- if (!session.get())
+ if (!session.get()) {
+ std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
+ }
}
void SessionHandler::assertClosed(const char* method) const {
@@ -207,5 +214,33 @@
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ assertAttached("complete");
+ session->complete(cumulative, range);
+}
+
+void SessionHandler::flush()
+{
+ assertAttached("flush");
+ session->flush();
+}
+void SessionHandler::sync()
+{
+ assertAttached("sync");
+ session->sync();
+}
+
+void SessionHandler::noop()
+{
+ assertAttached("noop");
+ session->noop();
+}
+
+void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Mon Feb 25 08:56:29 2008
@@ -28,7 +28,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/noncopyable.hpp>
@@ -36,16 +36,18 @@
namespace broker {
class Connection;
+class ConnectionState;
class SessionState;
/**
* A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
+ * receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::AMQP_ServerOperations::ExecutionHandler,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -90,11 +92,16 @@
void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
void detached();
+ //Execution methods:
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
-
Connection& connection;
framing::ChannelHandler channel;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Mon Feb 25 08:56:29 2008
@@ -45,7 +45,7 @@
// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
- SessionContext& h, uint32_t timeout_)
+ SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Mon Feb 25 08:56:29 2008
@@ -38,7 +38,7 @@
namespace broker {
class SessionState;
-class SessionContext;
+class SessionHandler;
/**
* Create and manage SessionState objects.
@@ -57,7 +57,7 @@
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb 25 08:56:29 2008
@@ -19,12 +19,16 @@
*
*/
#include "SessionState.h"
-#include "SessionManager.h"
-#include "SessionContext.h"
-#include "ConnectionState.h"
#include "Broker.h"
+#include "ConnectionState.h"
+#include "MessageDelivery.h"
#include "SemanticHandler.h"
+#include "SessionManager.h"
+#include "SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
@@ -37,17 +41,17 @@
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
+ semanticState(*this, *this),
+ adapter(semanticState),
+ msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+ ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.addOutputTask(&semanticState);
Manageable* parent = broker.GetVhostObject ();
@@ -76,7 +80,7 @@
mgmtObject->resourceDestroy ();
}
-SessionContext* SessionState::getHandler() {
+SessionHandler* SessionState::getHandler() {
return handler;
}
@@ -91,20 +95,19 @@
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
+ handler = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
}
}
-void SessionState::attach(SessionContext& h) {
+void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
- out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
@@ -112,7 +115,7 @@
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ h.getConnection().outputTasks.addOutputTask(&semanticState);
}
void SessionState::activateOutput()
@@ -163,6 +166,101 @@
}
return status;
+}
+
+void SessionState::handleCommand(framing::AMQMethodBody* method)
+{
+ SequenceNumber id = incoming.next();
+ Invoker::Result invocation = invoke(adapter, *method);
+ incoming.complete(id);
+
+ if (!invocation.wasHandled()) {
+ throw NotImplementedException("Not implemented");
+ } else if (invocation.hasResult()) {
+ getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ }
+ if (method->isSync()) {
+ incoming.sync(id);
+ sendCompletion();
+ }
+ //TODO: if window gets too large send unsolicited completion
+}
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(incoming.next());
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&getConnection());
+ semanticState.handle(msg);
+ msgBuilder.end();
+ incoming.track(msg);
+ if (msg->getFrames().getMethod()->isSync()) {
+ incoming.sync(msg->getCommandId());
+ sendCompletion();
+ }
+ }
+}
+
+void SessionState::handle(AMQFrame& frame)
+{
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assmblies are non-content bearing and all content-bearing
+ //assmeblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod());
+ } else {
+ handleContent(frame);
+ }
+
+}
+
+DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+{
+ uint32_t maxFrameSize = getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
+ return outgoing.hwm;
+}
+
+void SessionState::sendCompletion()
+{
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
+ getProxy().getExecution().complete(mark.getValue(), range);
+}
+
+void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ semanticState.ackCumulative(mark.getValue());
+ }
+ range.processRanges(ackOp);
+}
+
+void SessionState::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
+
+void SessionState::sync()
+{
+ incoming.sync();
+ sendCompletion();
+}
+
+void SessionState::noop()
+{
+ incoming.noop();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb 25 08:56:29 2008
@@ -27,10 +27,15 @@
#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
+#include "BrokerAdapter.h"
+#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
+#include "SessionContext.h"
+#include "SemanticState.h"
+#include "IncomingExecutionContext.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -47,8 +52,7 @@
namespace broker {
-class SemanticHandler;
-class SessionContext;
+class SessionHandler;
class SessionManager;
class Broker;
class ConnectionState;
@@ -58,8 +62,8 @@
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::Chains,
- public sys::OutputControl,
+ public SessionContext,
+ public DeliveryAdapter,
public management::Manageable
{
public:
@@ -67,10 +71,10 @@
bool isAttached() { return handler; }
void detach();
- void attach(SessionContext& handler);
+ void attach(SessionHandler& handler);
- SessionContext* getHandler();
+ SessionHandler* getHandler();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
@@ -85,6 +89,19 @@
/** OutputControl **/
void activateOutput();
+ void handle(framing::AMQFrame& frame);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
+
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void sync();
+ void sendCompletion();
+
+ //delivery adapter methods:
+ DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
+
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
@@ -92,21 +109,32 @@
// Normally SessionManager creates sessions.
SessionState(SessionManager*,
- SessionContext* out,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
private:
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+
SessionManager* factory;
- SessionContext* handler;
+ SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
framing::ProtocolVersion version;
sys::Mutex lock;
- boost::scoped_ptr<SemanticHandler> semanticHandler;
+
+ SemanticState semanticState;
+ BrokerAdapter adapter;
+ MessageBuilder msgBuilder;
+
+ //execution state
+ IncomingExecutionContext incoming;
+ framing::Window outgoing;
+ RangedOperation ackOp;
+
management::Session::shared_ptr mgmtObject;
friend class SessionManager;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb 25 08:56:29 2008
@@ -17,7 +17,7 @@
*/
#include "Cluster.h"
-#include "qpid/broker/SessionState.h"
+#include "qpid/broker/PreviewSessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::PreviewSessionState;
namespace {
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ PreviewSessionState& session;
Cluster& cluster;
bool busy;
Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+ ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
void handle(AMQFrame& f) {
Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@
c.next = h;
}
-struct SessionObserver : public broker::SessionManager::Observer {
+struct SessionObserver : public broker::PreviewSessionManager::Observer {
Cluster& cluster;
SessionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void opened(PreviewSessionState& s) {
// FIXME aconway 2008-01-29: IList for memory management.
ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb 25 08:56:29 2008
@@ -62,7 +62,7 @@
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -116,7 +116,7 @@
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- intrusive_ptr<broker::SessionManager::Observer> observer;
+ intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb 25 08:56:29 2008
@@ -69,7 +69,7 @@
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- broker->getSessionManager().add(cluster->getObserver());
+ broker->getPreviewSessionManager().add(cluster->getObserver());
}
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h Mon Feb 25 08:56:29 2008
@@ -33,6 +33,7 @@
namespace framing {
static ProtocolVersion highestProtocolVersion(99, 0);
+//static ProtocolVersion highestProtocolVersion(0, 10);
} /* namespace framing */
} /* namespace qpid */
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h Mon Feb 25 08:56:29 2008
@@ -39,6 +39,7 @@
void send(const AMQBody&);
ProtocolVersion getVersion() const;
+ FrameHandler& getHandler() { return out; }
protected:
FrameHandler& out;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Feb 25 08:56:29 2008
@@ -92,7 +92,7 @@
BOOST_CHECK_THROW(session.close(), InternalErrorException);
}
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) {
BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
}