You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/10/21 19:03:33 UTC

svn commit: r828108 - in /qpid/trunk/qpid/cpp/src/qpid: amqp_0_10/SessionHandler.cpp amqp_0_10/SessionHandler.h client/ConnectionImpl.cpp client/SessionImpl.cpp client/SessionImpl.h cluster/UpdateClient.cpp

Author: aconway
Date: Wed Oct 21 17:03:33 2009
New Revision: 828108

URL: http://svn.apache.org/viewvc?rev=828108&view=rev
Log:
Fix problems with sessions going out of scope and session numbers wrapping around.

Fixes QPID-1789: sessions that go out of scope without being detached will
detach themselves.

Also fixes several issues that arise when the session numbers wraps around
and start re-using old numbers.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Oct 21 17:03:33 2009
@@ -35,14 +35,14 @@
 using namespace std;
 
 void SessionHandler::checkAttached() {
-    if (!getState()) {
-        ignoring = true;
+    if (!getState()) 
         throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
-    }
 }
 
 SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
-    : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
+    : channel(ch, out), peer(channel),
+      awaitingDetached(false),
+      sendReady(), receiveReady() {}
 
 SessionHandler::~SessionHandler() {}
 
@@ -50,7 +50,7 @@
 bool isSessionControl(AMQMethodBody* m) {
     return m &&
         m->amqpClassId() == SESSION_CLASS_ID;
-}
+    }
 bool isSessionDetachedControl(AMQMethodBody* m) {
     return isSessionControl(m) &&
         m->amqpMethodId() == SESSION_DETACHED_METHOD_ID;
@@ -76,12 +76,13 @@
     // Note on channel states: a channel is attached if session != 0
     AMQMethodBody* m = f.getBody()->getMethod();
     try {
-        if (ignoring && !isSessionDetachedControl(m))
-            return;
-        else if (isSessionControl(m))
+        if (isSessionControl(m)) {
             invoke(*m);
+        }
         else {
-            checkAttached();
+            // Drop frames if we are awaiting a detached control or
+            // if we are currently detached.
+            if (awaitingDetached || !getState()) return;  
             if (!receiveReady)
                 throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
             if (!getState()->receiverRecord(f))
@@ -142,11 +143,11 @@
     // Save the name for possible session-busy exception. Session-busy
     // can be thrown before we have attached the handler to a valid
     // SessionState, and in that case we need the name to send peer.detached
-    name = name_;               
+    name = name_;
     if (getState() && name == getState()->getId().getName())
         return;                 // Idempotent
     if (getState())
-        throw TransportBusyException(
+            throw TransportBusyException(
             QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));
     setState(name, force);
     QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());
@@ -157,8 +158,8 @@
         sendCommandPoint(getState()->senderGetCommandPoint());
 }
 
-#define CHECK_NAME(NAME, MSG) do {                                       \
-    checkAttached();                                                \
+#define CHECK_NAME(NAME, MSG) do {                                      \
+    checkAttached();                                                    \
     if (NAME != getState()->getId().getName())                          \
         throw InvalidArgumentException(                                 \
             QPID_MSG(MSG << ": incorrect session name: " << NAME        \
@@ -178,7 +179,7 @@
 
 void SessionHandler::detached(const std::string& name, uint8_t code) {
     CHECK_NAME(name, "session.detached");
-    ignoring = false;
+    awaitingDetached = false;
     if (code != session::DETACH_CODE_NORMAL)
         channelException(convert(code), "session.detached from peer.");
     else {
@@ -273,7 +274,7 @@
 void SessionHandler::sendDetach()
 {
     checkAttached();
-    ignoring = true;
+    awaitingDetached = true;
     peer.detach(getState()->getId().getName());
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Wed Oct 21 17:03:33 2009
@@ -101,14 +101,15 @@
     QPID_COMMON_EXTERN virtual void handleOut(framing::AMQFrame&);
 
     framing::ChannelHandler channel;
-    framing::AMQP_AllProxy::Session  peer;
-    bool ignoring;
-    bool sendReady, receiveReady;
-    std::string name;
 
   private:
     void checkAttached();
     void sendCommandPoint(const SessionPoint&);
+
+    framing::AMQP_AllProxy::Session  peer;
+    std::string name;
+    bool awaitingDetached;
+    bool sendReady, receiveReady;
 };
 }} // namespace qpid::amqp_0_10
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Oct 21 17:03:33 2009
@@ -95,11 +95,21 @@
 void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
 {
     Mutex::ScopedLock l(lock);
-    session->setChannel(channel == NEXT_CHANNEL ? nextChannel++ : channel);
-    boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
-    boost::shared_ptr<SessionImpl> ss = s.lock();
-    if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId()));
-    s = session;
+    for (uint16_t i = 0; i < NEXT_CHANNEL; i++) { //will at most search through channels once
+        uint16_t c = channel == NEXT_CHANNEL ? nextChannel++ : channel;
+        boost::weak_ptr<SessionImpl>& s = sessions[c];
+        boost::shared_ptr<SessionImpl> ss = s.lock();
+        if (!ss) {
+            //channel is free, we can assign it to this session
+            session->setChannel(c);
+            s = session;
+            return;
+        } else if (channel != NEXT_CHANNEL) {
+            //channel is taken and was requested explicitly so don't look for another
+            throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId()));
+        } //else channel is busy, but we can keep looking for a free one
+    }
+
 }
 
 void ConnectionImpl::handle(framing::AMQFrame& frame)
@@ -165,7 +175,6 @@
     } else {
         QPID_LOG(debug, "No security layer in place");
     }
-
     failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
 }
 
@@ -246,7 +255,7 @@
 {
     return handler;
 }
-    
+
 std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
     return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Oct 21 17:03:33 2009
@@ -65,7 +65,8 @@
       nextIn(0),
       nextOut(0),
       sendMsgCredit(0),
-      doClearDeliveryPropertiesExchange(true)
+      doClearDeliveryPropertiesExchange(true),
+      autoDetach(true)
 {
     channel.next = connectionShared.get();
 }
@@ -73,8 +74,11 @@
 SessionImpl::~SessionImpl() {
     {
         Lock l(state);
-        if (state != DETACHED) {
-            QPID_LOG(warning, "Session was not closed cleanly");
+        if (state != DETACHED && state != DETACHING) {
+            QPID_LOG(warning, "Session was not closed cleanly: " << id);
+            // Inform broker but don't wait for detached as that deadlocks.
+            // The detached will be ignored as the channel will be invalid.
+            if (autoDetach) detach();
             setState(DETACHED);
             handleClosed();
             state.waitWaiters();
@@ -816,4 +820,6 @@
     return connectionWeak.lock();
 }
 
+void SessionImpl::disableAutoDetach() { autoDetach = false; }
+
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Wed Oct 21 17:03:33 2009
@@ -132,6 +132,9 @@
 
     void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
 
+    /** Suppress sending detach in destructor. Used by cluster to build session state */
+    void disableAutoDetach();
+
 private:
     enum State {
         INACTIVE,
@@ -247,6 +250,8 @@
 
     bool doClearDeliveryPropertiesExchange;
 
+    bool autoDetach;
+    
   friend class client::SessionHandler;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=828108&r1=828107&r2=828108&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Oct 21 17:03:33 2009
@@ -313,6 +313,7 @@
     // Create a client session to update session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
     boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
+    simpl->disableAutoDetach();
     client::SessionBase_0_10Access(shadowSession).set(simpl);
     AMQP_AllProxy::ClusterConnection proxy(simpl->out);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org