You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/02/21 18:40:49 UTC

svn commit: r629883 [1/2] - in /incubator/qpid/trunk/qpid: cpp/rubygen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/ specs/

Author: gsim
Date: Thu Feb 21 09:40:42 2008
New Revision: 629883

URL: http://svn.apache.org/viewvc?rev=629883&view=rev
Log:
Start moving towards final 0-10 spec:
* marked preview spec as 99-0 to distinguish it from 0-10 (which will now be used for the final version)
* modified python client to treat 99-0 as 0-10 for now
* modified broker to have two paths for the two different versions: 99-0 uses PreviewConnection, PreviewConnectionHandler 
  and PreviewSessionHandler which are straight copy & pastes of the Connection, ConnectionHandler and SessionHandler now
  associated with 0-10 (so we can migrate the 0-10 path to the final spec without affecting clients working with the preview
  version)
 

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Thu Feb 21 09:40:42 2008
@@ -85,8 +85,8 @@
           }
           cpp_class(@classname, "public SessionBase") {
             public
-            genl "Session_0_10() {}"
-            genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}"
+            genl "Session_#{@amqp.version.bars}() {}"
+            genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}"
             session_methods.each { |m|
               genl
               doxygen(m)

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Feb 21 09:40:42 2008
@@ -163,6 +163,9 @@
   qpid/broker/Queue.cpp \
   qpid/broker/PersistableMessage.cpp \
   qpid/broker/Bridge.cpp \
+  qpid/broker/PreviewConnection.cpp \
+  qpid/broker/PreviewConnectionHandler.cpp \
+  qpid/broker/PreviewSessionHandler.cpp \
   qpid/broker/Connection.cpp \
   qpid/broker/ConnectionHandler.cpp \
   qpid/broker/ConnectionFactory.cpp \
@@ -186,6 +189,7 @@
   qpid/broker/MessageDelivery.cpp \
   qpid/broker/MessageHandlerImpl.cpp \
   qpid/broker/MessageStoreModule.cpp \
+  qpid/broker/MultiVersionConnectionInputHandler.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/QueueBindings.cpp \
@@ -201,6 +205,7 @@
   qpid/broker/SessionManager.h \
   qpid/broker/SessionManager.cpp \
   qpid/broker/SessionHandler.h \
+  qpid/broker/SessionContext.h \
   qpid/broker/SessionHandler.cpp \
   qpid/broker/SemanticHandler.cpp \
   qpid/broker/Timer.cpp \
@@ -262,7 +267,11 @@
   qpid/broker/Queue.h \
   qpid/broker/BrokerSingleton.h \
   qpid/broker/Bridge.h \
+  qpid/broker/PreviewConnection.h \
+  qpid/broker/PreviewConnectionHandler.h \
+  qpid/broker/PreviewSessionHandler.h \
   qpid/broker/Connection.h \
+  qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionFactory.h \
   qpid/broker/ConnectionHandler.h \
   qpid/broker/ConnectionToken.h \
@@ -293,6 +302,7 @@
   qpid/broker/MessageHandlerImpl.h \
   qpid/broker/MessageStore.h \
   qpid/broker/MessageStoreModule.h \
+  qpid/broker/MultiVersionConnectionInputHandler.h \
   qpid/broker/NameGenerator.h \
   qpid/broker/NullMessageStore.h \
   qpid/broker/Persistable.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Feb 21 09:40:42 2008
@@ -19,7 +19,7 @@
  *
  */
 #include "Bridge.h"
-#include "Connection.h"
+#include "ConnectionState.h"
 
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/FieldTable.h"
@@ -31,7 +31,7 @@
 namespace qpid {
 namespace broker {
 
-Bridge::Bridge(framing::ChannelId id, Connection& c, CancellationListener l, const management::ArgsLinkBridge& _args) : 
+Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : 
     args(_args), channel(id, &(c.getOutput())), peer(channel), 
     mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
     connection(c), listener(l)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Feb 21 09:40:42 2008
@@ -32,14 +32,14 @@
 namespace qpid {
 namespace broker {
 
-class Connection;
+class ConnectionState;
 
 class Bridge : public management::Manageable
 {
 public:
     typedef boost::function<void(Bridge*)> CancellationListener;
 
-    Bridge(framing::ChannelId id, Connection& c, CancellationListener l,
+    Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
            const management::ArgsLinkBridge& args);
     ~Bridge();
 
@@ -54,7 +54,7 @@
     framing::ChannelHandler channel;
     framing::AMQP_ServerProxy peer;
     management::Bridge::shared_ptr mgmtObject;
-    Connection& connection;
+    ConnectionState& connection;
     CancellationListener listener;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Feb 21 09:40:42 2008
@@ -86,13 +86,7 @@
 
 
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
-    broker(broker_),
-    outputTasks(*out_),
-    out(out_),
-    framemax(65535), 
-    heartbeat(0),
-    client(0),
-    stagingThreshold(broker.getStagingThreshold()),
+    ConnectionState(out_, broker_),
     adapter(*this),
     mgmtClosing(0),
     mgmtId(mgmtId_)
@@ -226,17 +220,6 @@
     }
 
     return status;
-}
-
-void Connection::setUserId(const string& uid)
-{
-    userId = uid;
-    QPID_LOG (debug, "UserId is " << userId);
-}
-
-const string& Connection::getUserId() const
-{
-    return userId;
 }
 
 Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Feb 21 09:40:42 2008
@@ -39,6 +39,7 @@
 #include "qpid/sys/Socket.h"
 #include "qpid/Exception.h"
 #include "ConnectionHandler.h"
+#include "ConnectionState.h"
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Client.h"
@@ -50,8 +51,7 @@
 namespace broker {
 
 class Connection : public sys::ConnectionInputHandler, 
-                   public ConnectionToken,
-                   public management::Manageable
+                   public ConnectionState
 {
   public:
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
@@ -63,25 +63,6 @@
     /** Close the connection */
     void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
 
-    sys::ConnectionOutputHandler& getOutput() const { return *out; }
-    framing::ProtocolVersion getVersion() const { return version; }
-
-    uint32_t getFrameMax() const { return framemax; }
-    uint16_t getHeartbeat() const { return heartbeat; }
-    uint64_t getStagingThreshold() const { return stagingThreshold; }
-
-    void setFrameMax(uint32_t fm) { framemax = fm; }
-    void setHeartbeat(uint16_t hb) { heartbeat = hb; }
-    void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
-    
-    Broker& getBroker() { return broker; }
-
-    Broker& broker;
-    std::vector<Queue::shared_ptr> exclusiveQueues;
-    
-    //contained output tasks
-    sys::AggregateOutput outputTasks;
-
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
     void initiated(const framing::ProtocolInitiation& header);
@@ -98,9 +79,6 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
 
-    void setUserId(const string& uid);
-    const string& getUserId() const;
-
     void initMgmt(bool asLink = false);
 
   private:
@@ -126,17 +104,11 @@
     class MgmtClient;
     class MgmtLink;
 
-    framing::ProtocolVersion version;
     ChannelMap channels;
-    sys::ConnectionOutputHandler* out;
-    uint32_t framemax;
-    uint16_t heartbeat;
     framing::AMQP_ClientProxy::Connection* client;
-    uint64_t stagingThreshold;
     ConnectionHandler adapter;
     std::auto_ptr<MgmtWrapper> mgmtWrapper;
     bool mgmtClosing;
-    string userId;
     const std::string mgmtId;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Thu Feb 21 09:40:42 2008
@@ -20,6 +20,7 @@
  */
 #include "ConnectionFactory.h"
 #include "Connection.h"
+#include "MultiVersionConnectionInputHandler.h"
 
 namespace qpid {
 namespace broker {
@@ -38,7 +39,7 @@
 ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
                           const std::string& id)
 {
-    return new Connection(out, broker, id);
+    return new MultiVersionConnectionInputHandler(out, broker, id);
 }
 
 }} // namespace qpid::broker

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionState_
+#define _ConnectionState_
+
+#include <vector>
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/management/Manageable.h"
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class ConnectionState : public ConnectionToken, public management::Manageable
+{
+  public:
+    ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : 
+        broker(b), 
+        outputTasks(*o),
+        out(o), 
+        framemax(65535), 
+        heartbeat(0),
+        stagingThreshold(broker.getStagingThreshold())
+        {}
+
+
+
+    virtual ~ConnectionState () {}
+
+    uint32_t getFrameMax() const { return framemax; }
+    uint16_t getHeartbeat() const { return heartbeat; }
+    uint64_t getStagingThreshold() const { return stagingThreshold; }
+
+    void setFrameMax(uint32_t fm) { framemax = fm; }
+    void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+    void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+
+    void setUserId(const string& uid) {  userId = uid; }
+    const string& getUserId() const { return userId; }
+    
+    Broker& getBroker() { return broker; }
+
+    Broker& broker;
+    std::vector<Queue::shared_ptr> exclusiveQueues;
+    
+    //contained output tasks
+    sys::AggregateOutput outputTasks;
+
+    sys::ConnectionOutputHandler& getOutput() const { return *out; }
+    framing::ProtocolVersion getVersion() const { return version; }
+
+  protected:
+    framing::ProtocolVersion version;
+    sys::ConnectionOutputHandler* out;
+    uint32_t framemax;
+    uint16_t heartbeat;
+    uint64_t stagingThreshold;
+    string userId;
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Thu Feb 21 09:40:42 2008
@@ -21,6 +21,7 @@
 
 #include "SemanticState.h"
 #include "SessionState.h"
+#include "ConnectionState.h"
 
 namespace qpid {
 namespace broker {
@@ -39,7 +40,7 @@
     HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
-    Connection& getConnection() { return session.getConnection(); }
+    ConnectionState& getConnection() { return session.getConnection(); }
     Broker& getBroker() { return session.getBroker(); }
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MultiVersionConnectionInputHandler.h"
+#include "Connection.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler(
+    qpid::sys::ConnectionOutputHandler* _out, 
+    Broker& _broker, 
+    const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {}
+
+    
+void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i)
+{
+    if (i.getMajor() == 99 && i.getMinor() == 0) {
+        handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id));
+    } else if (i.getMajor() == 0 && i.getMinor() == 10) {
+        handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id));
+    } else {
+        throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString());        
+    }
+    handler->initiated(i);
+}
+
+void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
+{
+    check();
+    handler->received(f);
+}
+
+void MultiVersionConnectionInputHandler::idleOut()
+{
+    check();
+    handler->idleOut();
+}
+
+void MultiVersionConnectionInputHandler::idleIn()
+{
+    check();
+    handler->idleIn();
+}
+
+bool MultiVersionConnectionInputHandler::doOutput()
+{
+    return check(false) &&  handler->doOutput();
+}
+    
+qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
+{
+    return qpid::framing::ProtocolInitiation(linkVersion);
+}
+
+void MultiVersionConnectionInputHandler::closed()
+{
+    check();
+    handler->closed();
+}
+
+bool MultiVersionConnectionInputHandler::check(bool fail)
+{
+    if (!handler.get()) { 
+        if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
+        else return false;
+    } else {
+        return true;
+    }
+}
+
+}
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MultiVersionConnectionInputHandler_
+#define _MultiVersionConnectionInputHandler_
+
+#include <memory>
+#include <string>
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHandler
+{
+    qpid::framing::ProtocolVersion linkVersion;//version used for inter-broker links
+    std::auto_ptr<qpid::sys::ConnectionInputHandler> handler;
+    qpid::sys::ConnectionOutputHandler* out;
+    Broker& broker; 
+    const std::string id;
+
+    bool check(bool fail = true);
+
+public:
+    MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
+    virtual ~MultiVersionConnectionInputHandler() {}
+
+    void initiated(const qpid::framing::ProtocolInitiation&);
+    void received(qpid::framing::AMQFrame&);
+    void idleOut();
+    void idleIn();
+    bool doOutput();    
+    qpid::framing::ProtocolInitiation getInitiation();
+    void closed();
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnection.h"
+#include "SessionState.h"
+#include "BrokerAdapter.h"
+#include "Bridge.h"
+#include "SemanticHandler.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/ptr_map.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <algorithm>
+#include <iostream>
+#include <assert.h>
+
+using namespace boost;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper
+{
+    management::Client::shared_ptr mgmtClient;
+
+public:
+    MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+    ~MgmtClient();
+    void received(framing::AMQFrame& frame);
+    management::ManagementObject::shared_ptr getManagementObject() const;
+    void closing();
+};
+
+class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
+{
+    typedef boost::ptr_vector<Bridge> Bridges;
+
+    management::Link::shared_ptr mgmtLink;
+    Bridges created;//holds list of bridges pending creation
+    Bridges cancelled;//holds list of bridges pending cancellation
+    Bridges active;//holds active bridges
+    uint channelCounter;
+    sys::Mutex lock;
+
+    void cancel(Bridge*);
+
+public:
+    MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+    ~MgmtLink();
+    void received(framing::AMQFrame& frame);
+    management::ManagementObject::shared_ptr getManagementObject() const;
+    void closing();
+    void processPending();
+    void process(PreviewConnection& connection, const management::Args& args);
+};
+
+
+PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
+    ConnectionState(out_, broker_),
+    adapter(*this),
+    mgmtClosing(0),
+    mgmtId(mgmtId_)
+{}
+
+void PreviewConnection::initMgmt(bool asLink)
+{
+    Manageable* parent = broker.GetVhostObject ();
+
+    if (parent != 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+        if (agent.get () != 0)
+        {
+            if (asLink) {
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+            } else {
+                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+            }
+        }
+    }
+}
+
+PreviewConnection::~PreviewConnection () {}
+
+void PreviewConnection::received(framing::AMQFrame& frame){
+    if (mgmtClosing)
+        close (403, "Closed by Management Request", 0, 0);
+
+    if (frame.getChannel() == 0) {
+        adapter.handle(frame);
+    } else {
+        getChannel(frame.getChannel()).in(frame);
+    }
+    
+    if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+}
+
+void PreviewConnection::close(
+    ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+    adapter.close(code, text, classId, methodId);
+    channels.clear();
+    getOutput().close();
+}
+
+void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
+    version = ProtocolVersion(header.getMajor(), header.getMinor());
+    adapter.init(header);
+    initMgmt();
+}
+
+void PreviewConnection::idleOut(){}
+
+void PreviewConnection::idleIn(){}
+
+void PreviewConnection::closed(){ // Physically closed, suspend open sessions.
+    try {
+	for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
+	    get_pointer(i)->localSuspend();
+        while (!exclusiveQueues.empty()) {
+            Queue::shared_ptr q(exclusiveQueues.front());
+            q->releaseExclusiveOwnership();
+            if (q->canAutoDelete()) {
+                Queue::tryAutoDelete(broker, q);
+            }
+            exclusiveQueues.erase(exclusiveQueues.begin());
+        }
+    } catch(std::exception& e) {
+        QPID_LOG(error, " Unhandled exception while closing session: " <<
+                 e.what());
+        assert(0);
+    }
+}
+
+bool PreviewConnection::doOutput()
+{    
+    try{
+        //process any pending mgmt commands:
+        if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+        //then do other output as needed:
+        return outputTasks.doOutput();
+    }catch(ConnectionException& e){
+        close(e.code, e.what(), 0, 0);
+    }catch(std::exception& e){
+        close(541/*internal error*/, e.what(), 0, 0);
+    }
+    return false;
+}
+
+void PreviewConnection::closeChannel(uint16_t id) {
+    ChannelMap::iterator i = channels.find(id);
+    if (i != channels.end()) channels.erase(i);
+}
+
+PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) {
+    ChannelMap::iterator i=channels.find(id);
+    if (i == channels.end()) {
+        i = channels.insert(id, new PreviewSessionHandler(*this, id)).first;
+    }
+    return *get_pointer(i);
+}
+
+ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const
+{
+    return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+}
+
+Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
+                                                   Args&    args)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]");
+
+    switch (methodId)
+    {
+    case management::Client::METHOD_CLOSE :
+        mgmtClosing = 1;
+        if (mgmtWrapper.get()) mgmtWrapper->closing();
+        status = Manageable::STATUS_OK;
+        break;
+    case management::Link::METHOD_BRIDGE :
+        //queue this up and request chance to do output (i.e. get connections thread of control):
+        mgmtWrapper->process(*this, args);
+        out->activateOutput();
+        status = Manageable::STATUS_OK;
+        break;
+    }
+
+    return status;
+}
+
+PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) 
+    : channelCounter(1)
+{
+    mgmtLink = management::Link::shared_ptr
+        (new management::Link(conn, parent, mgmtId));
+    agent->addObject (mgmtLink);
+}
+
+PreviewConnection::MgmtLink::~MgmtLink()
+{
+    if (mgmtLink.get () != 0)
+        mgmtLink->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame)
+{
+    if (mgmtLink.get () != 0)
+    {
+        mgmtLink->inc_framesFromPeer ();
+        mgmtLink->inc_bytesFromPeer (frame.size ());
+    }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const
+{
+    return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void PreviewConnection::MgmtLink::closing()
+{
+    if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void PreviewConnection::MgmtLink::processPending()
+{
+    //process any pending creates
+    if (!created.empty()) {
+        for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+            i->create();
+        }
+        active.transfer(active.end(), created.begin(), created.end(), created);
+    }
+    if (!cancelled.empty()) {
+        //process any pending cancellations
+        for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+            i->cancel();
+        }
+        cancelled.clear();
+    }
+}
+
+void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
+{   
+    created.push_back(new Bridge(channelCounter++, connection, 
+                                 boost::bind(&MgmtLink::cancel, this, _1),
+                                 dynamic_cast<const management::ArgsLinkBridge&>(args)));
+}
+
+void PreviewConnection::MgmtLink::cancel(Bridge* b)
+{   
+    //need to take this out the active map and add it to the cancelled map
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        if (&(*i) == b) {
+            cancelled.transfer(cancelled.end(), i, active);
+            break;
+        }
+    }
+}
+
+PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+    mgmtClient = management::Client::shared_ptr
+        (new management::Client (conn, parent, mgmtId));
+    agent->addObject (mgmtClient);
+}
+
+PreviewConnection::MgmtClient::~MgmtClient()
+{
+    if (mgmtClient.get () != 0)
+        mgmtClient->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame)
+{
+    if (mgmtClient.get () != 0)
+    {
+        mgmtClient->inc_framesFromClient ();
+        mgmtClient->inc_bytesFromClient (frame.size ());
+    }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const
+{
+    return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void PreviewConnection::MgmtClient::closing()
+{
+    if (mgmtClient) mgmtClient->set_closing (1);
+}
+
+}}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnection_
+#define _PreviewConnection_
+
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "Broker.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/Exception.h"
+#include "PreviewConnectionHandler.h"
+#include "ConnectionState.h"
+#include "PreviewSessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection : public sys::ConnectionInputHandler, 
+                   public ConnectionState
+{
+  public:
+    PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
+    ~PreviewConnection ();
+
+    /** Get the PreviewSessionHandler for channel. Create if it does not already exist */
+    PreviewSessionHandler& getChannel(framing::ChannelId channel);
+
+    /** Close the connection */
+    void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+    // ConnectionInputHandler methods
+    void received(framing::AMQFrame& frame);
+    void initiated(const framing::ProtocolInitiation& header);
+    void idleOut();
+    void idleIn();
+    void closed();
+    bool doOutput();
+    framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
+
+    void closeChannel(framing::ChannelId channel);
+
+    // Manageable entry points
+    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::Manageable::status_t
+        ManagementMethod (uint32_t methodId, management::Args& args);
+
+    void initMgmt(bool asLink = false);
+
+  private:
+    typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap;
+    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+    /**
+     * Connection may appear, for the purposes of management, as a
+     * normal client initiated connection or as an agent initiated
+     * inter-broker link. This wrapper abstracts the common interface
+     * for both.
+     */
+    class MgmtWrapper
+    {
+    public:
+        virtual ~MgmtWrapper(){}
+        virtual void received(framing::AMQFrame& frame) = 0;
+        virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
+        virtual void closing() = 0;
+        virtual void processPending(){}
+        virtual void process(PreviewConnection&, const management::Args&){}
+    };
+    class MgmtClient;
+    class MgmtLink;
+
+    ChannelMap channels;
+    framing::AMQP_ClientProxy::Connection* client;
+    uint64_t stagingThreshold;
+    PreviewConnectionHandler adapter;
+    std::auto_ptr<MgmtWrapper> mgmtWrapper;
+    bool mgmtClosing;
+    const std::string mgmtId;
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,158 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewConnectionHandler.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+
+namespace 
+{
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
+}
+
+void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) {
+    FieldTable properties;
+    string mechanisms(PLAIN);
+    string locales(en_US);
+    handler->serverMode = true;
+    handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+}
+
+void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+    handler->client.close(code, text, classId, methodId);
+}
+
+void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
+{
+    AMQMethodBody* method=frame.getBody()->getMethod();
+    try{
+        if (handler->serverMode) {
+            if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+                throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+        } else {
+            if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
+                throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+        }
+    }catch(ConnectionException& e){
+        handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection)  : handler(new Handler(connection)) {}
+
+PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()), 
+                                                      connection(c), serverMode(false) {}
+
+void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+    const string& mechanism, 
+    const string& response, const string& /*locale*/)
+{
+    //TODO: handle SASL mechanisms more cleverly
+    if (mechanism == PLAIN) {
+        if (response.size() > 0 && response[0] == (char) 0) {
+            string temp = response.substr(1);
+            string::size_type i = temp.find((char)0);
+            string uid = temp.substr(0, i);
+            string pwd = temp.substr(i + 1);
+            //TODO: authentication
+            connection.setUserId(uid);
+        }
+    }
+    client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
+}
+        
+void PreviewConnectionHandler::Handler::secureOk(const string& /*response*/){}
+        
+void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
+    uint32_t framemax, uint16_t heartbeat)
+{
+    connection.setFrameMax(framemax);
+    connection.setHeartbeat(heartbeat);
+}
+        
+void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/,
+    const string& /*capabilities*/, bool /*insist*/)
+{
+    string knownhosts;
+    client.openOk(knownhosts);
+}
+
+        
+void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, 
+    uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+    client.closeOk();
+    connection.getOutput().close();
+} 
+        
+void PreviewConnectionHandler::Handler::closeOk(){
+    connection.getOutput().close();
+} 
+
+
+void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+                                       uint8_t /*versionMinor*/,
+                                       const FieldTable& /*serverProperties*/,
+                                       const string& /*mechanisms*/,
+                                       const string& /*locales*/)
+{
+    string uid = "qpidd";
+    string pwd = "qpidd";
+    string response = ((char)0) + uid + ((char)0) + pwd;
+    server.startOk(FieldTable(), PLAIN, response, en_US);
+    connection.initMgmt(true);
+}
+
+void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+    server.secureOk("");
+}
+
+void PreviewConnectionHandler::Handler::tune(uint16_t channelMax,
+                                      uint32_t frameMax,
+                                      uint16_t heartbeat)
+{
+    connection.setFrameMax(frameMax);
+    connection.setHeartbeat(heartbeat);
+    server.tuneOk(channelMax, frameMax, heartbeat);
+    server.open("/", "", true);
+}
+
+void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
+{
+    
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnectionAdapter_
+#define _PreviewConnectionAdapter_
+
+#include <memory>
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+
+// TODO aconway 2007-09-18: Rename to ConnectionHandler
+class PreviewConnectionHandler : public framing::FrameHandler
+{
+    struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, 
+        public framing::AMQP_ClientOperations::ConnectionHandler
+    {
+        framing::AMQP_ClientProxy::Connection client;
+        framing::AMQP_ServerProxy::Connection server;
+        PreviewConnection& connection;
+        bool serverMode;
+    
+        Handler(PreviewConnection& connection);
+        void startOk(const qpid::framing::FieldTable& clientProperties,
+                     const std::string& mechanism, const std::string& response,
+                     const std::string& locale); 
+        void secureOk(const std::string& response); 
+        void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); 
+        void open(const std::string& virtualHost,
+                  const std::string& capabilities, bool insist); 
+        void close(uint16_t replyCode, const std::string& replyText,
+                   uint16_t classId, uint16_t methodId); 
+        void closeOk(); 
+
+
+        void start(uint8_t versionMajor,
+                   uint8_t versionMinor,
+                   const qpid::framing::FieldTable& serverProperties,
+                   const std::string& mechanisms,
+                   const std::string& locales);
+        
+        void secure(const std::string& challenge);
+        
+        void tune(uint16_t channelMax,
+                  uint32_t frameMax,
+                  uint16_t heartbeat);
+        
+        void openOk(const std::string& knownHosts);
+        
+        void redirect(const std::string& host, const std::string& knownHosts);        
+    };
+    std::auto_ptr<Handler> handler;
+  public:
+    PreviewConnectionHandler(PreviewConnection& connection);
+    void init(const framing::ProtocolInitiation& header);
+    void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
+    void handle(framing::AMQFrame& frame);
+};
+
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionHandler.h"
+#include "SessionState.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+using namespace framing;
+using namespace std;
+using namespace qpid::sys;
+
+PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
+    : SessionContext(c.getOutput()),
+      connection(c), channel(ch, &c.getOutput()),
+      proxy(out),               // Via my own handleOut() for L2 data.
+      peerSession(channel),     // Direct to channel for L2 commands.
+      ignoring(false) {}
+
+PreviewSessionHandler::~PreviewSessionHandler() {}
+
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
+
+void PreviewSessionHandler::handleIn(AMQFrame& f) {
+    // Note on channel states: a channel is open if session != 0.  A
+    // channel that is closed (session == 0) can be in the "ignoring"
+    // state. This is a temporary state after we have sent a channel
+    // exception, where extra frames might arrive that should be
+    // ignored.
+    //
+    AMQMethodBody* m = f.getBody()->getMethod();
+    try {
+        if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
+            return;
+        } else if (session.get()) {
+            boost::optional<SequenceNumber> ack=session->received(f);
+            session->in.handle(f);
+            if (ack)
+                peerSession.ack(*ack, SequenceNumberSet());
+        } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+            return;
+        } else if (!ignoring) {
+            throw ChannelErrorException(
+                QPID_MSG("Channel " << channel.get() << " is not open"));
+        }
+    } catch(const ChannelException& e) {
+        ignoring=true;          // Ignore trailing frames sent by client.
+        session->detach();
+        session.reset();
+        peerSession.closed(e.code, e.what());
+    }catch(const ConnectionException& e){
+        connection.close(e.code, e.what(), classId(m), methodId(m));
+    }catch(const std::exception& e){
+        connection.close(
+            framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+    }
+}
+
+void PreviewSessionHandler::handleOut(AMQFrame& f) {
+    channel.handle(f);          // Send it.
+    if (session->sent(f))
+        peerSession.solicitAck();
+}
+
+void PreviewSessionHandler::assertAttached(const char* method) const {
+    if (!session.get())
+        throw ChannelErrorException(
+            QPID_MSG(method << " failed: No session for channel "
+                     << getChannel()));
+}
+
+void PreviewSessionHandler::assertClosed(const char* method) const {
+    if (session.get())
+        throw ChannelBusyException(
+            QPID_MSG(method << " failed: channel " << channel.get()
+                     << " is already open."));
+}
+
+void  PreviewSessionHandler::open(uint32_t detachedLifetime) {
+    assertClosed("open");
+    std::auto_ptr<SessionState> state(
+        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    session.reset(state.release());
+    peerSession.attached(session->getId(), session->getTimeout());
+}
+
+void  PreviewSessionHandler::resume(const Uuid& id) {
+    assertClosed("resume");
+    session = connection.broker.getSessionManager().resume(id);
+    session->attach(*this);
+    SequenceNumber seq = session->resuming();
+    peerSession.attached(session->getId(), session->getTimeout());
+    proxy.getSession().ack(seq, SequenceNumberSet());
+}
+
+void  PreviewSessionHandler::flow(bool /*active*/) {
+    assertAttached("flow");
+    // TODO aconway 2007-09-19: Removed in 0-10, remove 
+    assert(0); throw NotImplementedException("session.flow");
+}
+
+void  PreviewSessionHandler::flowOk(bool /*active*/) {
+    assertAttached("flowOk");
+    // TODO aconway 2007-09-19: Removed in 0-10, remove 
+    assert(0); throw NotImplementedException("session.flowOk");
+}
+
+void  PreviewSessionHandler::close() {
+    assertAttached("close");
+    QPID_LOG(info, "Received session.close");
+    ignoring=false;
+    session->detach();
+    session.reset();
+    peerSession.closed(REPLY_SUCCESS, "ok");
+    assert(&connection.getChannel(channel.get()) == this);
+    connection.closeChannel(channel.get()); 
+}
+
+void  PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) {
+    QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+    ignoring=false;
+    session->detach();
+    session.reset();
+}
+
+void PreviewSessionHandler::localSuspend() {
+    if (session.get() && session->isAttached()) {
+        session->detach();
+        connection.broker.getSessionManager().suspend(session);
+        session.reset();
+    }
+}
+
+void  PreviewSessionHandler::suspend() {
+    assertAttached("suspend");
+    localSuspend();
+    peerSession.detached();
+    assert(&connection.getChannel(channel.get()) == this);
+    connection.closeChannel(channel.get()); 
+}
+
+void  PreviewSessionHandler::ack(uint32_t     cumulativeSeenMark,
+                          const SequenceNumberSet& /*seenFrameSet*/)
+{
+    assertAttached("ack");
+    if (session->getState() == SessionState::RESUMING) {
+        session->receivedAck(cumulativeSeenMark);
+        framing::SessionState::Replay replay=session->replay();
+        std::for_each(replay.begin(), replay.end(),
+                      boost::bind(&PreviewSessionHandler::handleOut, this, _1));
+    }
+    else
+        session->receivedAck(cumulativeSeenMark);
+}
+
+void  PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+    // TODO aconway 2007-10-02: may be removed from spec.
+    assert(0); throw NotImplementedException("session.high-water-mark");
+}
+
+void  PreviewSessionHandler::solicitAck() {
+    assertAttached("solicit-ack");
+    peerSession.ack(session->sendingAck(), SequenceNumberSet());    
+}
+
+void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+    std::auto_ptr<SessionState> state(
+        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    session.reset(state.release());
+}
+
+void PreviewSessionHandler::detached()
+{
+    connection.broker.getSessionManager().suspend(session);
+    session.reset();
+}
+
+ConnectionState& PreviewSessionHandler::getConnection() { return connection; }
+const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; }
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,111 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H
+#define QPID_BROKER_PREVIEWSESSIONHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+class SessionState;
+
+/**
+ * A SessionHandler is associated with each active channel. It
+ * receives incoming frames, handles session commands and manages the
+ * association between the channel and a session.
+ */
+class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
+                       public framing::AMQP_ClientOperations::SessionHandler,
+                       public SessionContext,
+                       private boost::noncopyable
+{
+  public:
+    PreviewSessionHandler(PreviewConnection&, framing::ChannelId);
+    ~PreviewSessionHandler();
+
+    /** Returns 0 if not attached to a session */
+    SessionState* getSession() { return session.get(); }
+    const SessionState* getSession() const { return session.get(); }
+
+    framing::ChannelId getChannel() const { return channel.get(); }
+    
+    ConnectionState& getConnection();
+    const ConnectionState& getConnection() const;
+
+    framing::AMQP_ClientProxy& getProxy() { return proxy; }
+    const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+
+    // Called by closing connection.
+    void localSuspend();
+    void detach() { localSuspend(); }
+    
+  protected:
+    void handleIn(framing::AMQFrame&);
+    void handleOut(framing::AMQFrame&);
+    
+  private:
+    /// Session methods
+    void open(uint32_t detachedLifetime);
+    void flow(bool active);
+    void flowOk(bool active);
+    void close();
+    void closed(uint16_t replyCode, const std::string& replyText);
+    void resume(const framing::Uuid& sessionId);
+    void suspend();
+    void ack(uint32_t cumulativeSeenMark,
+             const framing::SequenceNumberSet& seenFrameSet);
+    void highWaterMark(uint32_t lastSentMark);
+    void solicitAck();
+    
+    //extra methods required for assuming client role
+    void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+    void detached();
+
+
+    void assertAttached(const char* method) const;
+    void assertActive(const char* method) const;
+    void assertClosed(const char* method) const;
+
+
+    PreviewConnection& connection;
+    framing::ChannelHandler channel;
+    framing::AMQP_ClientProxy proxy;
+    framing::AMQP_ClientProxy::Session peerSession;
+    bool ignoring;
+    std::auto_ptr<SessionState> session;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!QPID_BROKER_SESSIONHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Feb 21 09:40:42 2008
@@ -21,11 +21,10 @@
 
 #include "SemanticHandler.h"
 #include "SemanticState.h"
-#include "SessionHandler.h"
+#include "SessionContext.h"
 #include "SessionState.h"
 #include "BrokerAdapter.h"
 #include "MessageDelivery.h"
-#include "Connection.h"
 #include "qpid/framing/ExecutionCompleteBody.h"
 #include "qpid/framing/ExecutionResultBody.h"
 #include "qpid/framing/ServerInvoker.h"
@@ -165,7 +164,7 @@
 
 DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
 {
-    SessionHandler* handler = session.getHandler();
+    SessionContext* handler = session.getHandler();
     if (handler) {
         uint32_t maxFrameSize = handler->getConnection().getFrameMax();
         MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Thu Feb 21 09:40:42 2008
@@ -77,7 +77,7 @@
     DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
-    Connection& getConnection() { return session.getConnection(); }
+    //Connection& getConnection() { return session.getConnection(); }
     Broker& getBroker() { return session.getBroker(); }
 
 public:

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=629883&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Thu Feb 21 09:40:42 2008
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_SESSIONCONTEXT_H
+#define QPID_BROKER_SESSIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "ConnectionState.h"
+
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SessionContext : public framing::FrameHandler::InOutHandler                       
+{
+  public:
+    SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
+    virtual ~SessionContext(){}
+    virtual ConnectionState& getConnection() = 0;
+    virtual const ConnectionState& getConnection() const = 0;
+    virtual framing::AMQP_ClientProxy& getProxy() = 0;
+    virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
+    virtual void detach() = 0;
+    virtual framing::ChannelId getChannel() const = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!QPID_BROKER_SESSIONCONTEXT_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Thu Feb 21 09:40:42 2008
@@ -36,7 +36,7 @@
 using namespace qpid::sys;
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
-    : InOutHandler(0, &c.getOutput()),
+    : SessionContext(c.getOutput()),
       connection(c), channel(ch, &c.getOutput()),
       proxy(out),               // Via my own handleOut() for L2 data.
       peerSession(channel),     // Direct to channel for L2 commands.
@@ -203,5 +203,9 @@
     connection.broker.getSessionManager().suspend(session);
     session.reset();
 }
+
+
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Thu Feb 21 09:40:42 2008
@@ -28,6 +28,7 @@
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -42,9 +43,9 @@
  * receives incoming frames, handles session commands and manages the
  * association between the channel and a session.
  */
-class SessionHandler : public framing::FrameHandler::InOutHandler,
-                       public framing::AMQP_ServerOperations::SessionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
                        public framing::AMQP_ClientOperations::SessionHandler,
+                       public SessionContext,
                        private boost::noncopyable
 {
   public:
@@ -57,14 +58,15 @@
 
     framing::ChannelId getChannel() const { return channel.get(); }
     
-    Connection& getConnection() { return connection; }
-    const Connection& getConnection() const { return connection; }
+    ConnectionState& getConnection();
+    const ConnectionState& getConnection() const;
 
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
 
     // Called by closing connection.
     void localSuspend();
+    void detach() { localSuspend(); }
     
   protected:
     void handleIn(framing::AMQFrame&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Thu Feb 21 09:40:42 2008
@@ -45,7 +45,7 @@
 
 // FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
 std::auto_ptr<SessionState>  SessionManager::open(
-    SessionHandler& h, uint32_t timeout_)
+    SessionContext& h, uint32_t timeout_)
 {
     Mutex::ScopedLock l(lock);
     std::auto_ptr<SessionState> session(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Thu Feb 21 09:40:42 2008
@@ -38,7 +38,7 @@
 namespace broker {
 
 class SessionState;
-class SessionHandler;
+class SessionContext;
 
 /**
  * Create and manage SessionState objects.
@@ -57,7 +57,7 @@
     ~SessionManager();
     
     /** Open a new active session, caller takes ownership */
-    std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
+    std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
     
     /** Suspend a session, start it's timeout counter.
      * The factory takes ownership.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Feb 21 09:40:42 2008
@@ -20,8 +20,8 @@
  */
 #include "SessionState.h"
 #include "SessionManager.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SessionContext.h"
+#include "ConnectionState.h"
 #include "Broker.h"
 #include "SemanticHandler.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -37,7 +37,7 @@
 using qpid::management::Args;
 
 SessionState::SessionState(
-    SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) 
+    SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) 
     : framing::SessionState(ack, timeout_ > 0),
       factory(f), handler(h), id(true), timeout(timeout_),
       broker(h->getConnection().broker),
@@ -76,7 +76,7 @@
         mgmtObject->resourceDestroy ();
 }
 
-SessionHandler* SessionState::getHandler() {
+SessionContext* SessionState::getHandler() {
     return handler;
 }
 
@@ -85,7 +85,7 @@
     return getHandler()->getProxy();
 }
 
-Connection& SessionState::getConnection() {
+ConnectionState& SessionState::getConnection() {
     assert(isAttached());
     return getHandler()->getConnection();
 }
@@ -100,7 +100,7 @@
     }
 }
 
-void SessionState::attach(SessionHandler& h) {
+void SessionState::attach(SessionContext& h) {
     {
         Mutex::ScopedLock l(lock);
         handler = &h;
@@ -141,7 +141,7 @@
     case management::Session::METHOD_DETACH :
         if (handler != 0)
         {
-            handler->localSuspend ();
+            handler->detach();
         }
         status = Manageable::STATUS_OK;
         break;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Feb 21 09:40:42 2008
@@ -48,10 +48,10 @@
 namespace broker {
 
 class SemanticHandler;
-class SessionHandler;
+class SessionContext;
 class SessionManager;
 class Broker;
-class Connection;
+class ConnectionState;
 
 /**
  * Broker-side session state includes sessions handler chains, which may
@@ -67,16 +67,16 @@
     bool isAttached() { return handler; }
 
     void detach();
-    void attach(SessionHandler& handler);
+    void attach(SessionContext& handler);
 
     
-    SessionHandler* getHandler();
+    SessionContext* getHandler();
 
     /** @pre isAttached() */
     framing::AMQP_ClientProxy& getProxy();
     
     /** @pre isAttached() */
-    Connection& getConnection();
+    ConnectionState& getConnection();
 
     uint32_t getTimeout() const { return timeout; }
     Broker& getBroker() { return broker; }
@@ -92,14 +92,14 @@
 
     // Normally SessionManager creates sessions.
     SessionState(SessionManager*,
-                 SessionHandler* out,
+                 SessionContext* out,
                  uint32_t timeout,
                  uint32_t ackInterval);
     
 
   private:
     SessionManager* factory;
-    SessionHandler* handler;    
+    SessionContext* handler;    
     framing::Uuid id;
     uint32_t timeout;
     sys::AbsTime expiry;        // Used by SessionManager.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Thu Feb 21 09:40:42 2008
@@ -46,9 +46,9 @@
 
 class ScopedSync
 {
-    Session_0_10& session;
+    Session& session;
   public:
-    ScopedSync(Session_0_10& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
+    ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
     ~ScopedSync() { session.setSynchronous(false); }
 };
 
@@ -63,7 +63,7 @@
     join();
 }
 
-void Channel::open(const Session_0_10& s)
+void Channel::open(const Session& s)
 {
     Mutex::ScopedLock l(stopLock);
     if (isOpen())

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h Thu Feb 21 09:40:42 2008
@@ -29,7 +29,7 @@
 #include "Message.h"
 #include "Queue.h"
 #include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
 #include "qpid/Exception.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
@@ -79,7 +79,7 @@
     bool running;
 
     ConsumerMap consumers;
-    Session_0_10 session;
+    Session session;
     framing::ChannelId channelId;
     sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
     framing::Uuid uniqueId;
@@ -88,7 +88,7 @@
 
     void stop();
 
-    void open(const Session_0_10& session);
+    void open(const Session& session);
     void closeInternal();
     void join();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Thu Feb 21 09:40:42 2008
@@ -73,7 +73,7 @@
     channel.open(newSession(ASYNC));
 }
 
-Session_0_10 Connection::newSession(SynchronousMode sync,
+Session Connection::newSession(SynchronousMode sync,
                                     uint32_t detachedLifetime)
 {
     shared_ptr<SessionCore> core(
@@ -81,10 +81,10 @@
     core->setSync(sync);
     impl->addSession(core);
     core->open(detachedLifetime);
-    return Session_0_10(core);
+    return Session(core);
 }
 
-void Connection::resume(Session_0_10& session) {
+void Connection::resume(Session& session) {
     session.impl->setChannel(++channelIdCounter);
     impl->addSession(session.impl);
     session.impl->resume(impl);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Feb 21 09:40:42 2008
@@ -25,7 +25,7 @@
 #include <string>
 #include "Channel.h"
 #include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/Uuid.h"
 
@@ -134,13 +134,13 @@
      * that the broker may discard the session state. Default is 0,
      * meaning the session cannot be resumed.
      */
-    Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
+    Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
 
     /**
      * Resume a suspendded session. A session may be resumed
      * on a different connection to the one that created it.
      */
-    void resume(Session_0_10& session);
+    void resume(Session& session);
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Feb 21 09:40:42 2008
@@ -20,7 +20,6 @@
  */
 #include "Dispatcher.h"
 
-#include "qpid/client/Session_0_10.h"
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
@@ -38,7 +37,7 @@
 namespace qpid {
 namespace client {
 
-Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
+Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
 
 void Subscriber::received(Message& msg)
 {
@@ -48,7 +47,7 @@
     }
 }
 
-Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+Dispatcher::Dispatcher(Session& s, const std::string& q)
     : session(s), running(false), autoStop(true)
 {
     queue = q.empty() ? 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Thu Feb 21 09:40:42 2008
@@ -25,6 +25,7 @@
 #include <memory>
 #include <string>
 #include <boost/shared_ptr.hpp>
+#include "qpid/client/Session.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
@@ -34,17 +35,15 @@
 namespace qpid {
 namespace client {
 
-class Session_0_10;
-
 class Subscriber : public MessageListener
 {
-    Session_0_10& session;
+    Session& session;
     MessageListener* const listener;
     AckPolicy autoAck;
 
 public:
     typedef boost::shared_ptr<Subscriber> shared_ptr;
-    Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
+    Subscriber(Session& session, MessageListener* listener, AckPolicy);
     void received(Message& msg);
     
 };
@@ -56,7 +55,7 @@
     typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
     sys::Mutex lock;
     sys::Thread worker;
-    Session_0_10& session;
+    Session& session;
     Demux::QueuePtr queue;
     bool running;
     bool autoStop;
@@ -68,7 +67,7 @@
     bool isStopped();
 
 public:
-    Dispatcher(Session_0_10& session, const std::string& queue = "");
+    Dispatcher(Session& session, const std::string& queue = "");
 
     void start();
     void run();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Thu Feb 21 09:40:42 2008
@@ -50,7 +50,7 @@
 
   private:
   friend class SubscriptionManager;
-    Session_0_10 session;
+    Session session;
     Demux::QueuePtr queue;
     AckPolicy autoAck;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Thu Feb 21 09:40:42 2008
@@ -22,7 +22,7 @@
  *
  */
 #include <string>
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/TransferContent.h"
 
@@ -63,18 +63,18 @@
         return getMessageProperties().getApplicationHeaders(); 
     }
 
-    void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
+    void acknowledge(Session& session, bool cumulative = true, bool send = true) const
     {
         session.getExecution().completed(id, cumulative, send);
     }
 
     void acknowledge(bool cumulative = true, bool send = true) const
     {
-        const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
+        const_cast<Session&>(session).getExecution().completed(id, cumulative, send);
     }
 
     /**@internal for incoming messages */
-    Message(const framing::FrameSet& frameset, Session_0_10 s) :
+    Message(const framing::FrameSet& frameset, Session s) :
         method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
     {
         populate(frameset);
@@ -91,12 +91,12 @@
     }
 
     /**@internal use for incoming messages. */
-    void setSession(Session_0_10 s) { session=s; }
+    void setSession(Session s) { session=s; }
 private:
     //method and id are only set for received messages:
     framing::MessageTransferBody method;
     framing::SequenceNumber id;
-    Session_0_10 session;
+    Session session;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h?rev=629883&r1=629882&r2=629883&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Session.h Thu Feb 21 09:40:42 2008
@@ -21,7 +21,7 @@
  * under the License.
  *
  */
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session_99_0.h"
 
 namespace qpid {
 namespace client {
@@ -31,7 +31,7 @@
  *
  * \ingroup clientapi
  */
-typedef Session_0_10 Session;
+typedef Session_99_0 Session;
 
 }} // namespace qpid::client