You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/04/18 01:29:08 UTC

svn commit: r935275 - in /qpid/trunk/qpid/cpp/src/qpid/client: Connection.cpp ConnectionImpl.cpp ConnectionImpl.h

Author: gsim
Date: Sat Apr 17 23:29:08 2010
New Revision: 935275

URL: http://svn.apache.org/viewvc?rev=935275&view=rev
Log:
An alternative attempt at ensuring ConnectionImpl is not deleted before IO thread is finished with it.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=935275&r1=935274&r2=935275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Sat Apr 17 23:29:08 2010
@@ -122,7 +122,7 @@ void Connection::open(const ConnectionSe
     if (isOpen())
         throw Exception(QPID_MSG("Connection::open() was already called"));
 
-    impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+    impl = ConnectionImpl::create(version, settings);
     impl->open();
     if ( failureCallback )
         impl->registerFailureCallback ( failureCallback );

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=935275&r1=935274&r2=935275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Sat Apr 17 23:29:08 2010
@@ -162,14 +162,20 @@ void ConnectionImpl::init() {
     (void) theIO();
 }
 
+boost::shared_ptr<ConnectionImpl> ConnectionImpl::create(framing::ProtocolVersion version, const ConnectionSettings& settings)
+{
+    boost::shared_ptr<ConnectionImpl> instance(new ConnectionImpl(version, settings), boost::bind(&ConnectionImpl::release, _1));
+    return instance;
+}
+
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
       version(v),
       nextChannel(1),
-      shutdownComplete(false)
+      shutdownComplete(false),
+      released(false)
 {
-    QPID_LOG(debug, "ConnectionImpl created for " << version.toString());
     handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
     handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
     handler.onClose = boost::bind(&ConnectionImpl::closed, this,
@@ -182,12 +188,6 @@ ConnectionImpl::ConnectionImpl(framing::
 const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max();
 
 ConnectionImpl::~ConnectionImpl() {
-    if (connector) {
-        connector->close();
-        //wait until we get the shutdown callback to ensure that the
-        //io threads will not call back on us after deletion
-        waitForShutdownComplete();
-    }
     theIO().sub();
 }
 
@@ -249,6 +249,7 @@ void ConnectionImpl::open()
     connector->setShutdownHandler(this);
     try {
         connector->connect(host, port);
+    
     } catch (const std::exception& e) {
         QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
         connector.reset();
@@ -340,16 +341,33 @@ void ConnectionImpl::closed(uint16_t cod
 }
 
 void ConnectionImpl::shutdown() {
-    //May need to take a temporary reference to ourselves to prevent
-    //our destructor being called until after we have notified of
-    //shutdown completion; the destructor may be called as a result of
-    //the call to failedConnection().
-    boost::shared_ptr<ConnectionImpl> temp;
     if (!handler.isClosed()) {
-        temp = shared_from_this();
         failedConnection();
     }
-    notifyShutdownComplete();
+    bool canDelete;
+    {
+        Mutex::ScopedLock l(lock);
+        //association with IO thread is now ended
+        shutdownComplete = true;
+        //If we have already been released, we can now delete ourselves
+        canDelete = released;
+    }
+    if (canDelete) delete this;
+}
+
+void ConnectionImpl::release() {
+    bool isActive;
+    {
+        Mutex::ScopedLock l(lock);
+        released = true;
+        isActive = connector && !shutdownComplete;
+    }
+    //If we are still active - i.e. associated with an IO thread -
+    //then we cannot delete ourselves yet, but must wait for the
+    //shutdown callback which we can trigger by calling
+    //connector.close()
+    if (isActive) connector->close();
+    else delete this;
 }
 
 static const std::string CONN_CLOSED("Connection closed");
@@ -397,16 +415,4 @@ boost::shared_ptr<SessionImpl>  Connecti
     return simpl;
 }
 
-void ConnectionImpl::waitForShutdownComplete()
-{
-    Mutex::ScopedLock l(lock);
-    while(!shutdownComplete) lock.wait();
-}
-void ConnectionImpl::notifyShutdownComplete()
-{
-    Mutex::ScopedLock l(lock);
-    shutdownComplete = true;
-    lock.notifyAll();
-}
-
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=935275&r1=935274&r2=935275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Sat Apr 17 23:29:08 2010
@@ -26,7 +26,7 @@
 #include "qpid/client/ConnectionHandler.h"
 
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/sys/ShutdownHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
 
@@ -48,9 +48,7 @@ class ConnectionImpl : public Bounds,
                        public sys::TimeoutHandler, 
                        public sys::ShutdownHandler,
                        public boost::enable_shared_from_this<ConnectionImpl>
-
 {
-    friend class Connection;
     typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
 
     static const uint16_t NEXT_CHANNEL;
@@ -60,27 +58,28 @@ class ConnectionImpl : public Bounds,
     boost::scoped_ptr<Connector> connector;
     framing::ProtocolVersion version;
     uint16_t nextChannel;
-    sys::Monitor lock;
+    sys::Mutex lock;
     bool shutdownComplete;
+    bool released;
 
     boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask;
 
     template <class F> void closeInternal(const F&);
 
-    static void init();
     void incoming(framing::AMQFrame& frame);
     void closed(uint16_t, const std::string&);
     void idleOut();
     void idleIn();
     void shutdown();
     void failedConnection();
-    void waitForShutdownComplete();
-    void notifyShutdownComplete();
+    void release();
+    ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
 
     boost::function<void ()> failureCallback;
 
   public:
-    ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
+    static void init();
+    static boost::shared_ptr<ConnectionImpl> create(framing::ProtocolVersion version, const ConnectionSettings& settings);
     ~ConnectionImpl();
     
     void open();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org