You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/25 17:56:41 UTC

svn commit: r630934 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ qpid/framing/ tests/

Author: gsim
Date: Mon Feb 25 08:56:29 2008
New Revision: 630934

URL: http://svn.apache.org/viewvc?rev=630934&view=rev
Log:
Some refactoring of the 0-10 codepath (being migrated to final spec) that primarily colocates the current session and execution layers to facilitate implementing the new session layer that will now encompass this behaviour.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Feb 25 08:56:29 2008
@@ -166,6 +166,8 @@
   qpid/broker/PreviewConnection.cpp \
   qpid/broker/PreviewConnectionHandler.cpp \
   qpid/broker/PreviewSessionHandler.cpp \
+  qpid/broker/PreviewSessionManager.cpp \
+  qpid/broker/PreviewSessionState.cpp \
   qpid/broker/Connection.cpp \
   qpid/broker/ConnectionHandler.cpp \
   qpid/broker/ConnectionFactory.cpp \
@@ -270,6 +272,8 @@
   qpid/broker/PreviewConnection.h \
   qpid/broker/PreviewConnectionHandler.h \
   qpid/broker/PreviewSessionHandler.h \
+  qpid/broker/PreviewSessionManager.h \
+  qpid/broker/PreviewSessionState.h \
   qpid/broker/Connection.h \
   qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionFactory.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 25 08:56:29 2008
@@ -109,7 +109,8 @@
     store(0),
     dataDir(conf.noDataDir ? std::string () : conf.dataDir),
     factory(*this),
-    sessionManager(conf.ack)
+    sessionManager(conf.ack),
+    previewSessionManager(conf.ack)
 {
     // Early-Initialize plugins
     const Plugin::Plugins& plugins=Plugin::getPlugins();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 25 08:56:29 2008
@@ -30,6 +30,7 @@
 #include "MessageStore.h"
 #include "QueueRegistry.h"
 #include "SessionManager.h"
+#include "PreviewSessionManager.h"
 #include "Vhost.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
@@ -109,6 +110,7 @@
     DataDir& getDataDir() { return dataDir; }
 
     SessionManager& getSessionManager() { return sessionManager; }
+    PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
 
     management::ManagementObject::shared_ptr GetManagementObject (void) const;
     management::Manageable*                  GetVhostObject      (void) const;
@@ -136,6 +138,7 @@
     ConnectionFactory factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
+    PreviewSessionManager previewSessionManager;
     management::ManagementAgent::shared_ptr managementAgent;
     management::Broker::shared_ptr mgmtObject;
     Vhost::shared_ptr              vhostObject;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Mon Feb 25 08:56:29 2008
@@ -68,7 +68,7 @@
     DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
     DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
 
-    framing::ProtocolVersion getVersion() const { return session.getVersion();}
+    framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
 
 
     AccessHandler* getAccessHandler() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Mon Feb 25 08:56:29 2008
@@ -20,7 +20,7 @@
  */
 
 #include "SemanticState.h"
-#include "SessionState.h"
+#include "SessionContext.h"
 #include "ConnectionState.h"
 
 namespace qpid {
@@ -35,13 +35,13 @@
 class HandlerImpl {
   protected:
     SemanticState& state;
-    SessionState& session;
+    SessionContext& session;
 
     HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
     ConnectionState& getConnection() { return session.getConnection(); }
-    Broker& getBroker() { return session.getBroker(); }
+    Broker& getBroker() { return session.getConnection().getBroker(); }
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp Mon Feb 25 08:56:29 2008
@@ -19,7 +19,7 @@
  */
 
 #include "PreviewSessionHandler.h"
-#include "SessionState.h"
+#include "PreviewSessionState.h"
 #include "PreviewConnection.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
@@ -36,7 +36,7 @@
 using namespace qpid::sys;
 
 PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
-    : SessionContext(c.getOutput()),
+    : InOutHandler(0, &out),
       connection(c), channel(ch, &c.getOutput()),
       proxy(out),               // Via my own handleOut() for L2 data.
       peerSession(channel),     // Direct to channel for L2 commands.
@@ -106,15 +106,15 @@
 
 void  PreviewSessionHandler::open(uint32_t detachedLifetime) {
     assertClosed("open");
-    std::auto_ptr<SessionState> state(
-        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    std::auto_ptr<PreviewSessionState> state(
+        connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
     session.reset(state.release());
     peerSession.attached(session->getId(), session->getTimeout());
 }
 
 void  PreviewSessionHandler::resume(const Uuid& id) {
     assertClosed("resume");
-    session = connection.broker.getSessionManager().resume(id);
+    session = connection.broker.getPreviewSessionManager().resume(id);
     session->attach(*this);
     SequenceNumber seq = session->resuming();
     peerSession.attached(session->getId(), session->getTimeout());
@@ -154,7 +154,7 @@
 void PreviewSessionHandler::localSuspend() {
     if (session.get() && session->isAttached()) {
         session->detach();
-        connection.broker.getSessionManager().suspend(session);
+        connection.broker.getPreviewSessionManager().suspend(session);
         session.reset();
     }
 }
@@ -171,7 +171,7 @@
                           const SequenceNumberSet& /*seenFrameSet*/)
 {
     assertAttached("ack");
-    if (session->getState() == SessionState::RESUMING) {
+    if (session->getState() == PreviewSessionState::RESUMING) {
         session->receivedAck(cumulativeSeenMark);
         framing::SessionState::Replay replay=session->replay();
         std::for_each(replay.begin(), replay.end(),
@@ -193,14 +193,14 @@
 
 void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
 {
-    std::auto_ptr<SessionState> state(
-        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    std::auto_ptr<PreviewSessionState> state(
+        connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
     session.reset(state.release());
 }
 
 void PreviewSessionHandler::detached()
 {
-    connection.broker.getSessionManager().suspend(session);
+    connection.broker.getPreviewSessionManager().suspend(session);
     session.reset();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h Mon Feb 25 08:56:29 2008
@@ -36,7 +36,7 @@
 namespace broker {
 
 class PreviewConnection;
-class SessionState;
+class PreviewSessionState;
 
 /**
  * A SessionHandler is associated with each active channel. It
@@ -45,7 +45,7 @@
  */
 class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
                        public framing::AMQP_ClientOperations::SessionHandler,
-                       public SessionContext,
+                       public framing::FrameHandler::InOutHandler,
                        private boost::noncopyable
 {
   public:
@@ -53,8 +53,8 @@
     ~PreviewSessionHandler();
 
     /** Returns 0 if not attached to a session */
-    SessionState* getSession() { return session.get(); }
-    const SessionState* getSession() const { return session.get(); }
+    PreviewSessionState* getSession() { return session.get(); }
+    const PreviewSessionState* getSession() const { return session.get(); }
 
     framing::ChannelId getChannel() const { return channel.get(); }
     
@@ -101,7 +101,7 @@
     framing::AMQP_ClientProxy proxy;
     framing::AMQP_ClientProxy::Session peerSession;
     bool ignoring;
-    std::auto_ptr<SessionState> session;
+    std::auto_ptr<PreviewSessionState> session;
 };
 
 }} // namespace qpid::broker

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp Mon Feb 25 08:56:29 2008
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionManager.h"
+#include "PreviewSessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/memory.h"
+
+#include <boost/bind.hpp>
+#include <boost/range.hpp>
+
+#include <algorithm>
+#include <functional>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+using namespace sys;
+using namespace framing;
+
+PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
+
+PreviewSessionManager::~PreviewSessionManager() {}
+
+// FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
+std::auto_ptr<PreviewSessionState>  PreviewSessionManager::open(
+    PreviewSessionHandler& h, uint32_t timeout_)
+{
+    Mutex::ScopedLock l(lock);
+    std::auto_ptr<PreviewSessionState> session(
+        new PreviewSessionState(this, &h, timeout_, ack));
+    active.insert(session->getId());
+    for_each(observers.begin(), observers.end(),
+             boost::bind(&Observer::opened, _1,boost::ref(*session)));
+    return session;
+}
+
+void  PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
+    Mutex::ScopedLock l(lock);
+    active.erase(session->getId());
+    session->suspend();
+    session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+    if (session->mgmtObject.get() != 0)
+        session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
+    suspended.push_back(session.release()); // In expiry order
+    eraseExpired();
+}
+
+std::auto_ptr<PreviewSessionState>  PreviewSessionManager::resume(const Uuid& id)
+{
+    Mutex::ScopedLock l(lock);
+    eraseExpired();
+    if (active.find(id) != active.end()) 
+        throw SessionBusyException(
+            QPID_MSG("Session already active: " << id));
+    Suspended::iterator i = std::find_if(
+        suspended.begin(), suspended.end(),
+        boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
+    );
+    if (i == suspended.end())
+        throw InvalidArgumentException(
+            QPID_MSG("No suspended session with id=" << id));
+    active.insert(id);
+    std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
+    return state;
+}
+
+void PreviewSessionManager::erase(const framing::Uuid& id)
+{
+    Mutex::ScopedLock l(lock);
+    active.erase(id);
+}
+
+void PreviewSessionManager::eraseExpired() {
+    // Called with lock held.
+    if (!suspended.empty()) {
+        Suspended::iterator keep = std::lower_bound(
+            suspended.begin(), suspended.end(), now(),
+            boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
+        if (suspended.begin() != keep) {
+            QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+            suspended.erase(suspended.begin(), keep);
+        }
+    }
+}
+
+void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) {
+    observers.push_back(o);
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h Mon Feb 25 08:56:29 2008
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
+#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/Uuid.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <set>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+class PreviewSessionState;
+class PreviewSessionHandler;
+
+/**
+ * Create and manage PreviewSessionState objects.
+ */
+class PreviewSessionManager : private boost::noncopyable {
+  public:
+    /**
+     * Observer notified of PreviewSessionManager events.
+     */
+    struct Observer : public RefCounted {
+        virtual void opened(PreviewSessionState&) {}
+    };
+    
+    PreviewSessionManager(uint32_t ack);
+    
+    ~PreviewSessionManager();
+    
+    /** Open a new active session, caller takes ownership */
+    std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
+    
+    /** Suspend a session, start it's timeout counter.
+     * The factory takes ownership.
+     */
+    void suspend(std::auto_ptr<PreviewSessionState> session);
+        
+    /** Resume a suspended session.
+     *@throw Exception if timed out or non-existant.
+     */
+    std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
+
+    /** Add an Observer. */
+    void add(const intrusive_ptr<Observer>&);
+    
+  private:
+    typedef boost::ptr_vector<PreviewSessionState> Suspended;
+    typedef std::set<framing::Uuid> Active;
+    typedef std::vector<intrusive_ptr<Observer> > Observers;
+
+    void erase(const framing::Uuid&);             
+    void eraseExpired();             
+
+    sys::Mutex lock;
+    Suspended suspended;
+    Active active;
+    uint32_t ack;
+    Observers observers;
+    
+  friend class PreviewSessionState; // removes deleted sessions from active set.
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+
+
+#endif  /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp Mon Feb 25 08:56:29 2008
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewSessionState.h"
+#include "PreviewSessionManager.h"
+#include "PreviewSessionHandler.h"
+#include "ConnectionState.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+PreviewSessionState::PreviewSessionState(
+    PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) 
+    : framing::SessionState(ack, timeout_ > 0),
+      factory(f), handler(h), id(true), timeout(timeout_),
+      broker(h->getConnection().broker),
+      version(h->getConnection().getVersion()),
+      semanticHandler(new SemanticHandler(*this))
+{
+    in.next = semanticHandler.get();
+    out.next = &handler->out;
+
+    getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+    Manageable* parent = broker.GetVhostObject ();
+
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+        if (agent.get () != 0)
+        {
+            mgmtObject = management::Session::shared_ptr
+                (new management::Session (this, parent, id.str ()));
+            mgmtObject->set_attached (1);
+            mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h->getChannel());
+            mgmtObject->set_detachedLifespan (getTimeout());
+            agent->addObject (mgmtObject);
+        }
+    }
+}
+
+PreviewSessionState::~PreviewSessionState() {
+    // Remove ID from active session list.
+    if (factory)
+        factory->erase(getId());
+    if (mgmtObject.get () != 0)
+        mgmtObject->resourceDestroy ();
+}
+
+PreviewSessionHandler* PreviewSessionState::getHandler() {
+    return handler;
+}
+
+AMQP_ClientProxy& PreviewSessionState::getProxy() {
+    assert(isAttached());
+    return getHandler()->getProxy();
+}
+
+ConnectionState& PreviewSessionState::getConnection() {
+    assert(isAttached());
+    return getHandler()->getConnection();
+}
+
+void PreviewSessionState::detach() {
+    getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+    Mutex::ScopedLock l(lock);
+    handler = 0; out.next = 0; 
+    if (mgmtObject.get() != 0)
+    {
+        mgmtObject->set_attached  (0);
+    }
+}
+
+void PreviewSessionState::attach(PreviewSessionHandler& h) {
+    {
+        Mutex::ScopedLock l(lock);
+        handler = &h;
+        out.next = &handler->out;
+        if (mgmtObject.get() != 0)
+        {
+            mgmtObject->set_attached (1);
+            mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h.getChannel());
+        }
+    }
+    h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
+
+void PreviewSessionState::activateOutput()
+{
+    Mutex::ScopedLock l(lock);
+    if (isAttached()) {
+        getConnection().outputTasks.activateOutput();
+    }
+}
+    //This class could be used as the callback for queue notifications
+    //if not attached, it can simply ignore the callback, else pass it
+    //on to the connection
+
+ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
+{
+    return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
+                                                     Args&    /*args*/)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    switch (methodId)
+    {
+    case management::Session::METHOD_DETACH :
+        if (handler != 0)
+        {
+            handler->detach();
+        }
+        status = Manageable::STATUS_OK;
+        break;
+
+    case management::Session::METHOD_CLOSE :
+        /*
+        if (handler != 0)
+        {
+            handler->getConnection().closeChannel(handler->getChannel());
+        }
+        status = Manageable::STATUS_OK;
+        break;
+        */
+
+    case management::Session::METHOD_SOLICITACK :
+    case management::Session::METHOD_RESETLIFESPAN :
+        status = Manageable::STATUS_NOT_IMPLEMENTED;
+        break;
+    }
+
+    return status;
+}
+
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h?rev=630934&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h Mon Feb 25 08:56:29 2008
@@ -0,0 +1,124 @@
+#ifndef QPID_BROKER_PREVIEWSESSION_H
+#define QPID_BROKER_PREVIEWSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <set>
+#include <vector>
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class SemanticHandler;
+class PreviewSessionHandler;
+class PreviewSessionManager;
+class Broker;
+class ConnectionState;
+
+/**
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state. 
+ */
+class PreviewSessionState : public framing::SessionState,
+    public SessionContext,
+    public framing::FrameHandler::Chains,
+    public management::Manageable
+{
+  public:
+    ~PreviewSessionState();
+    bool isAttached() { return handler; }
+
+    void detach();
+    void attach(PreviewSessionHandler& handler);
+
+    
+    PreviewSessionHandler* getHandler();
+
+    /** @pre isAttached() */
+    framing::AMQP_ClientProxy& getProxy();
+    
+    /** @pre isAttached() */
+    ConnectionState& getConnection();
+
+    uint32_t getTimeout() const { return timeout; }
+    Broker& getBroker() { return broker; }
+    framing::ProtocolVersion getVersion() const { return version; }
+
+    /** OutputControl **/
+    void activateOutput();
+
+    // Manageable entry points
+    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::Manageable::status_t
+        ManagementMethod (uint32_t methodId, management::Args& args);
+
+    // Normally SessionManager creates sessions.
+    PreviewSessionState(PreviewSessionManager*,
+                 PreviewSessionHandler* out,
+                 uint32_t timeout,
+                 uint32_t ackInterval);
+    
+
+  private:
+    PreviewSessionManager* factory;
+    PreviewSessionHandler* handler;    
+    framing::Uuid id;
+    uint32_t timeout;
+    sys::AbsTime expiry;        // Used by SessionManager.
+    Broker& broker;
+    framing::ProtocolVersion version;
+    sys::Mutex lock;
+    boost::scoped_ptr<SemanticHandler> semanticHandler;
+    management::Session::shared_ptr mgmtObject;
+
+  friend class PreviewSessionManager;
+};
+
+
+inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
+    return out << session.getId();
+}
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!QPID_BROKER_SESSION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Mon Feb 25 08:56:29 2008
@@ -22,7 +22,6 @@
 #include "SemanticHandler.h"
 #include "SemanticState.h"
 #include "SessionContext.h"
-#include "SessionState.h"
 #include "BrokerAdapter.h"
 #include "MessageDelivery.h"
 #include "qpid/framing/ExecutionCompleteBody.h"
@@ -37,9 +36,9 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-SemanticHandler::SemanticHandler(SessionState& s) : 
+SemanticHandler::SemanticHandler(SessionContext& s) : 
     state(*this,s), session(s),
-    msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
+    msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
     ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
  {}
 
@@ -164,13 +163,8 @@
 
 DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
 {
-    SessionContext* handler = session.getHandler();
-    if (handler) {
-        uint32_t maxFrameSize = handler->getConnection().getFrameMax();
-        MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
-    } else {
-        QPID_LOG(error, "Dropping message as session is no longer attached to a channel.");        
-    }
+    uint32_t maxFrameSize = session.getConnection().getFrameMax();
+    MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
     return outgoing.hwm;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Mon Feb 25 08:56:29 2008
@@ -46,7 +46,7 @@
 
 namespace broker {
 
-class SessionState;
+class SessionContext;
 
 class SemanticHandler : public DeliveryAdapter,
                         public framing::FrameHandler,
@@ -56,7 +56,7 @@
     typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;    
 
     SemanticState state;
-    SessionState& session;
+    SessionContext& session;
     // TODO aconway 2007-09-20: Why are these on the handler rather than the
     // state?
     IncomingExecutionContext incoming;
@@ -78,10 +78,10 @@
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
     //Connection& getConnection() { return session.getConnection(); }
-    Broker& getBroker() { return session.getBroker(); }
+    Broker& getBroker() { return session.getConnection().getBroker(); }
 
 public:
-    SemanticHandler(SessionState& session);
+    SemanticHandler(SessionContext& session);
 
     //frame handler:
     void handle(framing::AMQFrame& frame);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 25 08:56:29 2008
@@ -19,7 +19,7 @@
  *
  */
 
-#include "SessionState.h"
+#include "SessionContext.h"
 #include "BrokerAdapter.h"
 #include "Queue.h"
 #include "Connection.h"
@@ -56,7 +56,7 @@
 using namespace qpid::sys;
 using namespace qpid::ptr_map;
 
-SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
     : session(ss),
       deliveryAdapter(da),
       prefetchSize(0),
@@ -263,21 +263,16 @@
 
 bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
 {
-    if (parent->getSession().isAttached() && accept(msg.payload)) {
-        allocateCredit(msg.payload);
-        DeliveryId deliveryTag =
-            parent->deliveryAdapter.deliver(msg, token);
-        if (windowing || ackExpected) {
-            parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
-        } 
-        if (acquire && !ackExpected) {
-            queue->dequeue(0, msg.payload);
-        }
-        return true;
-    } else {
-        QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
-        return false;
+    allocateCredit(msg.payload);
+    DeliveryId deliveryTag =
+        parent->deliveryAdapter.deliver(msg, token);
+    if (windowing || ackExpected) {
+        parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+    } 
+    if (acquire && !ackExpected) {
+        queue->dequeue(0, msg.payload);
     }
+    return true;
 }
 
 bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
@@ -331,7 +326,7 @@
     if(queue) {
         queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
-            Queue::tryAutoDelete(getSession().getBroker(), queue);
+            Queue::tryAutoDelete(getSession().getConnection().getBroker(), queue);
         }
     }
 }
@@ -584,7 +579,7 @@
     if (name.empty()) {
         throw NotAllowedException(QPID_MSG("No queue name specified."));
     } else {
-        queue = session.getBroker().getQueues().find(name);
+        queue = session.getConnection().getBroker().getQueues().find(name);
         if (!queue)
             throw NotFoundException(QPID_MSG("Queue not found: "<<name));
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Feb 25 08:56:29 2008
@@ -45,7 +45,7 @@
 namespace qpid {
 namespace broker {
 
-class SessionState;
+class SessionContext;
 
 /**
  * SemanticState holds the L3 and L4 state of an open session, whether
@@ -98,7 +98,7 @@
     typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
-    SessionState& session;
+    SessionContext& session;
     DeliveryAdapter& deliveryAdapter;
     Queue::shared_ptr defaultQueue;
     ConsumerImplMap consumers;
@@ -129,10 +129,10 @@
     void cancel(ConsumerImpl&);
 
   public:
-    SemanticState(DeliveryAdapter&, SessionState&);
+    SemanticState(DeliveryAdapter&, SessionContext&);
     ~SemanticState();
 
-    SessionState& getSession() { return session; }
+    SessionContext& getSession() { return session; }
     
     /**
      * Get named queue, never returns 0.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Feb 25 08:56:29 2008
@@ -25,6 +25,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/OutputControl.h"
 #include "ConnectionState.h"
 
 
@@ -33,17 +34,12 @@
 namespace qpid {
 namespace broker {
 
-class SessionContext : public framing::FrameHandler::InOutHandler                       
+class SessionContext : public sys::OutputControl
 {
   public:
-    SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
     virtual ~SessionContext(){}
     virtual ConnectionState& getConnection() = 0;
-    virtual const ConnectionState& getConnection() const = 0;
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
-    virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
-    virtual void detach() = 0;
-    virtual framing::ChannelId getChannel() const = 0;
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon Feb 25 08:56:29 2008
@@ -21,6 +21,7 @@
 #include "SessionHandler.h"
 #include "SessionState.h"
 #include "Connection.h"
+#include "ConnectionState.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
 #include "qpid/framing/ClientInvoker.h"
@@ -36,7 +37,7 @@
 using namespace qpid::sys;
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
-    : SessionContext(c.getOutput()),
+    : InOutHandler(0, &out),
       connection(c), channel(ch, &c.getOutput()),
       proxy(out),               // Via my own handleOut() for L2 data.
       peerSession(channel),     // Direct to channel for L2 commands.
@@ -58,18 +59,22 @@
     //
     AMQMethodBody* m = f.getBody()->getMethod();
     try {
-        if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
-            return;
-        } else if (session.get()) {
-            boost::optional<SequenceNumber> ack=session->received(f);
-            session->in.handle(f);
-            if (ack)
-                peerSession.ack(*ack, SequenceNumberSet());
-        } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
-            return;
-        } else if (!ignoring) {
-            throw ChannelErrorException(
-                QPID_MSG("Channel " << channel.get() << " is not open"));
+        if (!ignoring) {
+            if (m && 
+                (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || 
+                 invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+                return;
+            } else if (session.get()) {
+                boost::optional<SequenceNumber> ack=session->received(f);
+                session->handle(f);
+                if (ack)
+                    peerSession.ack(*ack, SequenceNumberSet());
+            } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+                return;
+            } else {
+                throw ChannelErrorException(
+                    QPID_MSG("Channel " << channel.get() << " is not open"));
+            }
         }
     } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
@@ -91,10 +96,12 @@
 }
 
 void SessionHandler::assertAttached(const char* method) const {
-    if (!session.get())
+    if (!session.get()) {
+        std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
         throw ChannelErrorException(
             QPID_MSG(method << " failed: No session for channel "
                      << getChannel()));
+    }
 }
 
 void SessionHandler::assertClosed(const char* method) const {
@@ -207,5 +214,33 @@
 
 ConnectionState& SessionHandler::getConnection() { return connection; }
 const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) 
+{
+    assertAttached("complete");
+    session->complete(cumulative, range);
+}
+
+void SessionHandler::flush()
+{
+    assertAttached("flush");
+    session->flush();
+}
+void SessionHandler::sync()
+{
+    assertAttached("sync");
+    session->sync();
+}
+
+void SessionHandler::noop()
+{
+    assertAttached("noop");
+    session->noop();
+}
+
+void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+    //never actually sent by client at present
+}
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Mon Feb 25 08:56:29 2008
@@ -28,7 +28,7 @@
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
+#include "qpid/framing/SequenceNumber.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -36,16 +36,18 @@
 namespace broker {
 
 class Connection;
+class ConnectionState;
 class SessionState;
 
 /**
  * A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
+ * receives incoming frames, handles session controls and manages the
  * association between the channel and a session.
  */
 class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
                        public framing::AMQP_ClientOperations::SessionHandler,
-                       public SessionContext,
+                       public framing::AMQP_ServerOperations::ExecutionHandler,
+                       public framing::FrameHandler::InOutHandler,
                        private boost::noncopyable
 {
   public:
@@ -90,11 +92,16 @@
     void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
     void detached();
 
+    //Execution methods:
+    void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);    
+    void flush();
+    void noop();
+    void result(uint32_t command, const std::string& data);
+    void sync();
 
     void assertAttached(const char* method) const;
     void assertActive(const char* method) const;
     void assertClosed(const char* method) const;
-
 
     Connection& connection;
     framing::ChannelHandler channel;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Mon Feb 25 08:56:29 2008
@@ -45,7 +45,7 @@
 
 // FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
 std::auto_ptr<SessionState>  SessionManager::open(
-    SessionContext& h, uint32_t timeout_)
+    SessionHandler& h, uint32_t timeout_)
 {
     Mutex::ScopedLock l(lock);
     std::auto_ptr<SessionState> session(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Mon Feb 25 08:56:29 2008
@@ -38,7 +38,7 @@
 namespace broker {
 
 class SessionState;
-class SessionContext;
+class SessionHandler;
 
 /**
  * Create and manage SessionState objects.
@@ -57,7 +57,7 @@
     ~SessionManager();
     
     /** Open a new active session, caller takes ownership */
-    std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
+    std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_);
     
     /** Suspend a session, start it's timeout counter.
      * The factory takes ownership.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb 25 08:56:29 2008
@@ -19,12 +19,16 @@
  *
  */
 #include "SessionState.h"
-#include "SessionManager.h"
-#include "SessionContext.h"
-#include "ConnectionState.h"
 #include "Broker.h"
+#include "ConnectionState.h"
+#include "MessageDelivery.h"
 #include "SemanticHandler.h"
+#include "SessionManager.h"
+#include "SessionHandler.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace broker {
@@ -37,17 +41,17 @@
 using qpid::management::Args;
 
 SessionState::SessionState(
-    SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) 
+    SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) 
     : framing::SessionState(ack, timeout_ > 0),
       factory(f), handler(h), id(true), timeout(timeout_),
       broker(h->getConnection().broker),
       version(h->getConnection().getVersion()),
-      semanticHandler(new SemanticHandler(*this))
+      semanticState(*this, *this),
+      adapter(semanticState),
+      msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+      ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
 {
-    in.next = semanticHandler.get();
-    out.next = &handler->out;
-
-    getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+    getConnection().outputTasks.addOutputTask(&semanticState);
 
     Manageable* parent = broker.GetVhostObject ();
 
@@ -76,7 +80,7 @@
         mgmtObject->resourceDestroy ();
 }
 
-SessionContext* SessionState::getHandler() {
+SessionHandler* SessionState::getHandler() {
     return handler;
 }
 
@@ -91,20 +95,19 @@
 }
 
 void SessionState::detach() {
-    getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+    getConnection().outputTasks.removeOutputTask(&semanticState);
     Mutex::ScopedLock l(lock);
-    handler = 0; out.next = 0; 
+    handler = 0;
     if (mgmtObject.get() != 0)
     {
         mgmtObject->set_attached  (0);
     }
 }
 
-void SessionState::attach(SessionContext& h) {
+void SessionState::attach(SessionHandler& h) {
     {
         Mutex::ScopedLock l(lock);
         handler = &h;
-        out.next = &handler->out;
         if (mgmtObject.get() != 0)
         {
             mgmtObject->set_attached (1);
@@ -112,7 +115,7 @@
             mgmtObject->set_channelId (h.getChannel());
         }
     }
-    h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+    h.getConnection().outputTasks.addOutputTask(&semanticState);
 }
 
 void SessionState::activateOutput()
@@ -163,6 +166,101 @@
     }
 
     return status;
+}
+
+void SessionState::handleCommand(framing::AMQMethodBody* method)
+{
+    SequenceNumber id = incoming.next();
+    Invoker::Result invocation = invoke(adapter, *method);
+    incoming.complete(id);                                    
+    
+    if (!invocation.wasHandled()) {
+        throw NotImplementedException("Not implemented");
+    } else if (invocation.hasResult()) {
+        getProxy().getExecution().result(id.getValue(), invocation.getResult());
+    }
+    if (method->isSync()) { 
+        incoming.sync(id); 
+        sendCompletion(); 
+    }
+    //TODO: if window gets too large send unsolicited completion
+}
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+    intrusive_ptr<Message> msg(msgBuilder.getMessage());
+    if (!msg) {//start of frameset will be indicated by frame flags
+        msgBuilder.start(incoming.next());
+        msg = msgBuilder.getMessage();
+    }
+    msgBuilder.handle(frame);
+    if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+        msg->setPublisher(&getConnection());
+        semanticState.handle(msg);        
+        msgBuilder.end();
+        incoming.track(msg);
+        if (msg->getFrames().getMethod()->isSync()) { 
+            incoming.sync(msg->getCommandId()); 
+            sendCompletion(); 
+        }
+    }
+}
+
+void SessionState::handle(AMQFrame& frame)
+{
+    //TODO: make command handling more uniform, regardless of whether
+    //commands carry content. (For now, assume all single frame
+    //assmblies are non-content bearing and all content-bearing
+    //assmeblies will have more than one frame):
+    if (frame.getBof() && frame.getEof()) {
+        handleCommand(frame.getMethod());
+    } else {
+        handleContent(frame);
+    }
+
+}
+
+DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+{
+    uint32_t maxFrameSize = getConnection().getFrameMax();
+    MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
+    return outgoing.hwm;
+}
+
+void SessionState::sendCompletion()
+{
+    SequenceNumber mark = incoming.getMark();
+    SequenceNumberSet range = incoming.getRange();
+    getProxy().getExecution().complete(mark.getValue(), range);
+}
+
+void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) 
+{
+    //record: 
+    SequenceNumber mark(cumulative);
+    if (outgoing.lwm < mark) {
+        outgoing.lwm = mark;
+        //ack messages:
+        semanticState.ackCumulative(mark.getValue());
+    }
+    range.processRanges(ackOp);
+}
+
+void SessionState::flush()
+{
+    incoming.flush();
+    sendCompletion();
+}
+
+void SessionState::sync()
+{
+    incoming.sync();
+    sendCompletion();
+}
+
+void SessionState::noop()
+{
+    incoming.noop();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb 25 08:56:29 2008
@@ -27,10 +27,15 @@
 #include "qpid/framing/SessionState.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/OutputControl.h"
 #include "qpid/sys/Time.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Session.h"
+#include "BrokerAdapter.h"
+#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
+#include "SessionContext.h"
+#include "SemanticState.h"
+#include "IncomingExecutionContext.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -47,8 +52,7 @@
 
 namespace broker {
 
-class SemanticHandler;
-class SessionContext;
+class SessionHandler;
 class SessionManager;
 class Broker;
 class ConnectionState;
@@ -58,8 +62,8 @@
  * themselves have state. 
  */
 class SessionState : public framing::SessionState,
-    public framing::FrameHandler::Chains,
-    public sys::OutputControl,
+    public SessionContext,
+    public DeliveryAdapter,
     public management::Manageable
 {
   public:
@@ -67,10 +71,10 @@
     bool isAttached() { return handler; }
 
     void detach();
-    void attach(SessionContext& handler);
+    void attach(SessionHandler& handler);
 
     
-    SessionContext* getHandler();
+    SessionHandler* getHandler();
 
     /** @pre isAttached() */
     framing::AMQP_ClientProxy& getProxy();
@@ -85,6 +89,19 @@
     /** OutputControl **/
     void activateOutput();
 
+    void handle(framing::AMQFrame& frame);
+    void handleCommand(framing::AMQMethodBody* method);
+    void handleContent(framing::AMQFrame& frame);
+
+    void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);    
+    void flush();
+    void noop();
+    void sync();
+    void sendCompletion();
+
+    //delivery adapter methods:
+    DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
+
     // Manageable entry points
     management::ManagementObject::shared_ptr GetManagementObject (void) const;
     management::Manageable::status_t
@@ -92,21 +109,32 @@
 
     // Normally SessionManager creates sessions.
     SessionState(SessionManager*,
-                 SessionContext* out,
+                 SessionHandler* out,
                  uint32_t timeout,
                  uint32_t ackInterval);
     
 
   private:
+    typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;    
+
     SessionManager* factory;
-    SessionContext* handler;    
+    SessionHandler* handler;    
     framing::Uuid id;
     uint32_t timeout;
     sys::AbsTime expiry;        // Used by SessionManager.
     Broker& broker;
     framing::ProtocolVersion version;
     sys::Mutex lock;
-    boost::scoped_ptr<SemanticHandler> semanticHandler;
+
+    SemanticState semanticState;
+    BrokerAdapter adapter;
+    MessageBuilder msgBuilder;
+
+    //execution state
+    IncomingExecutionContext incoming;
+    framing::Window outgoing;
+    RangedOperation ackOp;
+
     management::Session::shared_ptr mgmtObject;
 
   friend class SessionManager;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb 25 08:56:29 2008
@@ -17,7 +17,7 @@
  */
 
 #include "Cluster.h"
-#include "qpid/broker/SessionState.h"
+#include "qpid/broker/PreviewSessionState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
-using broker::SessionState;
+using broker::PreviewSessionState;
 
 namespace {
 
 // Beginning of inbound chain: send to cluster.
 struct ClusterSendHandler : public FrameHandler {
-    SessionState& session;
+    PreviewSessionState& session;
     Cluster& cluster;
     bool busy;
     Monitor lock;
     
-    ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+    ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
 
     void handle(AMQFrame& f) {
         Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@
     c.next = h;
 }
 
-struct SessionObserver : public broker::SessionManager::Observer {
+struct SessionObserver : public broker::PreviewSessionManager::Observer {
     Cluster& cluster;
     SessionObserver(Cluster& c) : cluster(c) {}
     
-    void opened(SessionState& s) {
+    void opened(PreviewSessionState& s) {
         // FIXME aconway 2008-01-29: IList for memory management.
         ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
         ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb 25 08:56:29 2008
@@ -62,7 +62,7 @@
     virtual ~Cluster();
 
     // FIXME aconway 2008-01-29: 
-    intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+    intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
     
     /** Get the current cluster membership. */
     MemberList getMembers() const;
@@ -116,7 +116,7 @@
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
-    intrusive_ptr<broker::SessionManager::Observer> observer;
+    intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb 25 08:56:29 2008
@@ -69,7 +69,7 @@
             cluster = boost::in_place(options.name,
                                       options.getUrl(broker->getPort()),
                                       boost::ref(*broker));
-            broker->getSessionManager().add(cluster->getObserver());	
+            broker->getPreviewSessionManager().add(cluster->getObserver());	
         }
     }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h Mon Feb 25 08:56:29 2008
@@ -33,6 +33,7 @@
 namespace framing {
 
 static ProtocolVersion highestProtocolVersion(99, 0);
+//static ProtocolVersion highestProtocolVersion(0, 10);
 
 } /* namespace framing */
 } /* namespace qpid */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h Mon Feb 25 08:56:29 2008
@@ -39,6 +39,7 @@
     void send(const AMQBody&);
 
     ProtocolVersion getVersion() const;
+    FrameHandler& getHandler() { return out; }
 
   protected:
     FrameHandler& out;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=630934&r1=630933&r2=630934&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Feb 25 08:56:29 2008
@@ -92,7 +92,7 @@
     BOOST_CHECK_THROW(session.close(), InternalErrorException);    
 }
 
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) {
     BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
 }