You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/20 00:34:18 UTC

svn commit: r577459 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/client/

Author: aconway
Date: Wed Sep 19 15:34:11 2007
New Revision: 577459

URL: http://svn.apache.org/viewvc?rev=577459&view=rev
Log:

AMQP 0-10 Session suppported on broker and client.

Client always uses session on the wire but client::Channel API is
still available until all C++ tests are migrated.

Broker allows both session and channel connection to support python
tests.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
      - copied, changed from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
      - copied, changed from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 19 15:34:11 2007
@@ -218,7 +218,7 @@
   qpid/client/MessageListener.cpp		\
   qpid/client/Correlator.cpp			\
   qpid/client/CompletionTracker.cpp		\
-  qpid/client/ChannelHandler.cpp		\
+  qpid/client/SessionHandler.cpp		\
   qpid/client/ConnectionHandler.cpp		\
   qpid/client/ExecutionHandler.cpp		\
   qpid/client/FutureCompletion.cpp		\
@@ -308,7 +308,7 @@
   qpid/client/BlockingQueue.h \
   qpid/client/Correlator.h \
   qpid/client/CompletionTracker.h \
-  qpid/client/ChannelHandler.h \
+  qpid/client/SessionHandler.h \
   qpid/client/ChainableFrameHandler.h	\
   qpid/client/ConnectionHandler.h \
   qpid/client/Execution.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Wed Sep 19 15:34:11 2007
@@ -60,6 +60,7 @@
     : adapter(&a),
       broker(adapter->getConnection().broker),
       timeout(t),
+      id(true),
       prefetchSize(0),
       prefetchCount(0),
       tagGenerator("sgen"),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Wed Sep 19 15:34:11 2007
@@ -34,6 +34,7 @@
 #include "TxBuffer.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/shared_ptr.h"
 
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -96,6 +97,7 @@
     SessionHandler* adapter;
     Broker& broker;
     uint32_t timeout;
+    framing::Uuid id;
     boost::ptr_vector<framing::FrameHandler>  handlers;
 
     DeliveryAdapter* deliveryAdapter;
@@ -135,8 +137,10 @@
 
     Broker& getBroker() const { return broker; }
     
-    /** Session timeout. */
+    /** Session timeout, aka detached-lifetime. */
     uint32_t getTimeout() const { return timeout; }
+    /** Session ID */
+    const framing::Uuid& getId() const { return id; }
     
     /**
      * Get named queue, never returns 0.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Sep 19 15:34:11 2007
@@ -28,11 +28,13 @@
 namespace qpid {
 namespace broker {
 using namespace framing;
+using namespace std;
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
     : InOutHandler(0, &c.getOutput()),
       connection(c), channel(ch), proxy(out),
-      ignoring(false), channelHandler(*this) {}
+      ignoring(false), channelHandler(*this),
+      useChannelClose(false) {}
 
 SessionHandler::~SessionHandler() {}
 
@@ -50,18 +52,22 @@
     // 
     AMQMethodBody* m=f.getMethod();
     try {
-        if (m && m->invoke(&channelHandler))
+        if (m && (m->invoke(this) || m->invoke(&channelHandler)))
             return;
         else if (session)
             session->in(f);
         else if (!ignoring)
             throw ChannelErrorException(
                 QPID_MSG("Channel " << channel << " is not open"));
-    } catch(const ChannelException& e){
-        getProxy().getChannel().close(
-            e.code, e.toString(), classId(m), methodId(m));
-        session.reset();
+    } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
+        session.reset();
+        // FIXME aconway 2007-09-19: Dual-mode hack.
+        if (useChannelClose)
+            getProxy().getChannel().close(
+                e.code, e.toString(), classId(m), methodId(m));
+        else
+            getProxy().getSession().closed(e.code, e.toString());
     }catch(const ConnectionException& e){
         connection.close(e.code, e.what(), classId(m), methodId(m));
     }catch(const std::exception& e){
@@ -93,6 +99,7 @@
 }
 
 void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){
+    parent.useChannelClose=true;
     parent.assertClosed("open");
     parent.session.reset(new Session(parent, 0));
     parent.getProxy().getChannel().openOk();
@@ -112,7 +119,7 @@
 {
     // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
     // to text names.
-    QPID_LOG(warning, "Received session.close("<<replyCode<<","
+    QPID_LOG(warning, "Received channel.close("<<replyCode<<","
              <<replyText << ","
              << "classid=" <<classId<< ","
              << "methodid=" <<methodId);
@@ -134,6 +141,62 @@
 {
     //no specific action required, generic response handling should be
     //sufficient
+}
+
+void  SessionHandler::open(uint32_t detachedLifetime) {
+    assertClosed("open");
+    session.reset(new Session(*this, detachedLifetime));
+    getProxy().getSession().attached(session->getId(), session->getTimeout());
+}
+
+void  SessionHandler::flow(bool /*active*/) {
+    // FIXME aconway 2007-09-19: Removed in 0-10, remove 
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::flowOk(bool /*active*/) {
+    // FIXME aconway 2007-09-19: Removed in 0-10, remove 
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::close() {
+    QPID_LOG(info, "Received session.close");
+    ignoring=false;
+    session.reset();
+    getProxy().getSession().closed(REPLY_SUCCESS, "ok");
+    // No need to remove from connection map, will be re-used
+    // if channel is re-opened.
+}
+
+void  SessionHandler::closed(uint16_t replyCode, const string& replyText) {
+    // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+    // to text names.
+    QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+    ignoring=false;
+    session.reset();
+    // No need to remove from connection map, will be re-used
+    // if channel is re-opened.
+}
+
+void  SessionHandler::resume(const Uuid& /*sessionId*/) {
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::suspend() {
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::ack(uint32_t     /*cumulativeSeenMark*/,
+                          const SequenceNumberSet& /*seenFrameSet*/) {
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+    assert(0); throw NotImplementedException();
+}
+
+void  SessionHandler::solicitAck() {
+    assert(0); throw NotImplementedException();
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Sep 19 15:34:11 2007
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSIONADAPTER_H
-#define QPID_BROKER_SESSIONADAPTER_H
+#ifndef QPID_BROKER_SESSIONHANDLER_H
+#define QPID_BROKER_SESSIONHANDLER_H
 
 /*
  *
@@ -40,7 +40,8 @@
  *
  * SessionHandlers can be stored in a map by value.
  */
-class SessionHandler : public framing::FrameHandler::InOutHandler
+class SessionHandler : public framing::FrameHandler::InOutHandler,
+                       private framing::AMQP_ServerOperations::SessionHandler
 {
   public:
     SessionHandler(Connection&, framing::ChannelId);
@@ -63,7 +64,7 @@
     void handleOut(framing::AMQFrame&);
     
   private:
-    // FIXME aconway 2007-08-31: Move to session methods.
+    // FIXME aconway 2007-08-31: Drop channel.
     struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler {
         SessionHandler& parent;
 
@@ -81,7 +82,21 @@
         void closeOk(); 
     };
   friend class ChannelMethods;
-        
+
+    /// Session methods
+    void open(uint32_t detachedLifetime);
+    void flow(bool active);
+    void flowOk(bool active);
+    void close();
+    void closed(uint16_t replyCode, const std::string& replyText);
+    void resume(const framing::Uuid& sessionId);
+    void suspend();
+    void ack(uint32_t cumulativeSeenMark,
+             const framing::SequenceNumberSet& seenFrameSet);
+    void highWaterMark(uint32_t lastSentMark);
+    void solicitAck();
+
+
     void assertOpen(const char* method);
     void assertClosed(const char* method);
 
@@ -91,8 +106,11 @@
     shared_ptr<Session> session;
     bool ignoring;
     ChannelMethods channelHandler;
+    bool useChannelClose;       // FIXME aconway 2007-09-19: remove with channel.
 };
 
 }} // namespace qpid::broker
 
-#endif  /*!QPID_BROKER_SESSIONADAPTER_H*/
+
+
+#endif  /*!QPID_BROKER_SESSIONHANDLER_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 19 15:34:11 2007
@@ -33,7 +33,7 @@
 {
     l2.out = boost::bind(&FrameHandler::handle, out, _1);
     l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
-    l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1);
+    l3.out = boost::bind(&SessionHandler::outgoing, &l2, _1);
     l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=577459&r1=577458&r2=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Wed Sep 19 15:34:11 2007
@@ -28,7 +28,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
-#include "ChannelHandler.h"
+#include "SessionHandler.h"
 #include "ExecutionHandler.h"
 
 namespace qpid {
@@ -45,7 +45,7 @@
     };
 
     ExecutionHandler l3;
-    ChannelHandler l2;
+    SessionHandler l2;
     const uint16_t id;
     bool sync;
     bool isClosed;

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp (from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp&r1=577297&r2=577459&rev=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp Wed Sep 19 15:34:11 2007
@@ -19,7 +19,7 @@
  *
  */
 
-#include "ChannelHandler.h"
+#include "SessionHandler.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/all_method_bodies.h"
 
@@ -27,14 +27,14 @@
 using namespace qpid::framing;
 using namespace boost;
 
-ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
+SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
 
-void ChannelHandler::incoming(AMQFrame& frame)
+void SessionHandler::incoming(AMQFrame& frame)
 {
     AMQBody* body = frame.getBody();
     if (getState() == OPEN) {
-        ChannelCloseBody* closeBody=
-            dynamic_cast<ChannelCloseBody*>(body->getMethod());
+        SessionClosedBody* closeBody=
+            dynamic_cast<SessionClosedBody*>(body->getMethod());
         if (closeBody) {
             setState(CLOSED_BY_PEER);
             code = closeBody->getReplyCode();
@@ -46,12 +46,7 @@
             try {
                 in(frame);
             }catch(ChannelException& e){
-                AMQMethodBody* method=body->getMethod();
-                if (method)
-                    close(e.code, e.toString(),
-                          method->amqpClassId(), method->amqpMethodId());
-                else
-                    close(e.code, e.toString(), 0, 0);
+                closed(e.code, e.toString());
             }
         }
     } else {
@@ -62,7 +57,7 @@
     }
 }
 
-void ChannelHandler::outgoing(AMQFrame& frame)
+void SessionHandler::outgoing(AMQFrame& frame)
 {
     if (getState() == OPEN) {
         frame.setChannel(id);
@@ -74,12 +69,12 @@
     }
 }
 
-void ChannelHandler::open(uint16_t _id)
+void SessionHandler::open(uint16_t _id)
 {
     id = _id;
 
     setState(OPENING);
-    AMQFrame f(id, ChannelOpenBody(version));
+    AMQFrame f(id, SessionOpenBody(version));
     out(f);
 
     std::set<int> states;
@@ -91,37 +86,39 @@
     }
 }
 
-void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+void SessionHandler::close()
 {
     setState(CLOSING);
-    AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
+    AMQFrame f(id, SessionCloseBody(version));
     out(f);
+    waitFor(CLOSED);
 }
 
-void ChannelHandler::close()
+void SessionHandler::closed(uint16_t code, const std::string& msg)
 {
-    close(200, "OK", 0, 0);
-    waitFor(CLOSED);
+    setState(CLOSED);
+    AMQFrame f(id, SessionClosedBody(version, code, msg));
+    out(f);
 }
 
-void ChannelHandler::handleMethod(AMQMethodBody* method)
+void SessionHandler::handleMethod(AMQMethodBody* method)
 {
     switch (getState()) {
       case OPENING:
-        if (method->isA<ChannelOpenOkBody>()) {
+        if (method->isA<SessionAttachedBody>()) {
             setState(OPEN);
         } else {
             throw ConnectionException(504, "Channel not opened.");
         }
         break;
       case CLOSING:
-        if (method->isA<ChannelCloseOkBody>()) {
+        if (method->isA<SessionClosedBody>()) {
             setState(CLOSED);
         } //else just ignore it
         break;
       case CLOSED:
         throw ConnectionException(504, "Channel is closed.");
       default:
-        throw Exception("Unexpected state encountered in ChannelHandler!");
+        throw Exception("Unexpected state encountered in SessionHandler!");
     }
 }

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h (from r577297, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h&r1=577297&r2=577459&rev=577459&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h Wed Sep 19 15:34:11 2007
@@ -18,8 +18,8 @@
  * under the License.
  *
  */
-#ifndef _ChannelHandler_
-#define _ChannelHandler_
+#ifndef _SessionHandler_
+#define _SessionHandler_
 
 #include "StateManager.h"
 #include "ChainableFrameHandler.h"
@@ -28,7 +28,7 @@
 namespace qpid {
 namespace client {
 
-class ChannelHandler : private StateManager, public ChainableFrameHandler
+class SessionHandler : private StateManager, public ChainableFrameHandler
 {
     enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
     framing::ProtocolVersion version;
@@ -38,21 +38,19 @@
     std::string text;
 
     void handleMethod(framing::AMQMethodBody* method);
-
-    void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
-
+    void closed(uint16_t code, const std::string& msg);
 
 public:
     typedef boost::function<void(uint16_t, const std::string&)> CloseListener;    
 
-    ChannelHandler();
+    SessionHandler();
 
     void incoming(framing::AMQFrame& frame);
     void outgoing(framing::AMQFrame& frame);
 
     void open(uint16_t id);
     void close();
-
+    
     CloseListener onClose;
 };