You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/06/06 22:23:28 UTC

svn commit: r664114 - in /incubator/qpid/trunk/qpid/cpp: rubygen/framing.0-10/ src/ src/qpid/ src/qpid/broker/ src/qpid/client/ src/qpid/sys/

Author: aconway
Date: Fri Jun  6 13:23:28 2008
New Revision: 664114

URL: http://svn.apache.org/viewvc?rev=664114&view=rev
Log:

Added exceptions to sys::Waitable.
Fixed client side deadlock involving client::Bounds.
Fixed incorrect exception messages during connection shutdown.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h
      - copied, changed from r664034, incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Fri Jun  6 13:23:28 2008
@@ -62,14 +62,14 @@
   def reply_exceptions_h()
     h_file("#{@dir}/reply_exceptions") {
       include "qpid/Exception"
-      include "qpid/ExceptionHolder"
+      include "qpid/sys/ExceptionHolder"
       namespace(@namespace) {
         define_exceptions_for("execution", "error-code", "SessionException")
         define_exceptions_for("connection", "close-code", "ConnectionException")
         define_exceptions_for("session", "detach-code", "ChannelException")
         genl
         genl "void throwExecutionException(int code, const std::string& text);"
-        genl "void setExecutionException(ExceptionHolder& holder, int code, const std::string& text);"
+        genl "void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text);"
       }
     }
   end
@@ -81,11 +81,11 @@
       include "<assert.h>"
       namespace("qpid::framing") {
         scope("void throwExecutionException(int code, const std::string& text) {"){
-          genl "ExceptionHolder h;"
+          genl "sys::ExceptionHolder h;"
           genl "setExecutionException(h, code, text);"
           genl "h.raise();"
         }
-        scope("void setExecutionException(ExceptionHolder& holder, int code, const std::string& text) {"){        
+        scope("void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text) {"){        
           scope("switch (code) {") {
             enum = @amqp.class_("execution").domain("error-code").enum
             enum.choices.each { |c| 

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun  6 13:23:28 2008
@@ -344,7 +344,7 @@
   qpid/assert.h \
   qpid/DataDir.h \
   qpid/Exception.h \
-  qpid/ExceptionHolder.h \
+  qpid/sys/ExceptionHolder.h \
   qpid/amqp_0_10/Exception.h \
   qpid/Msg.h \
   qpid/Options.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Fri Jun  6 13:23:28 2008
@@ -34,18 +34,22 @@
 }
 
 Exception::Exception(const std::string& msg) throw() : message(msg) {
-    QPID_LOG(debug, "Exception: " << message);
+    QPID_LOG(debug, "Exception constructed: " << message);
 }
 
 Exception::~Exception() throw() {}
 
-std::string Exception::getPrefix() const { return "Exception"; }
+std::string Exception::getPrefix() const { return ""; }
 
 std::string Exception::getMessage() const { return message; }
 
 const char* Exception::what() const throw() {
-    if (whatStr.empty())
-        whatStr = getPrefix() +  ": " + message;    
+    // Construct the what string the first time it is needed.
+    if (whatStr.empty()) {
+        whatStr = getPrefix();
+        if (!whatStr.empty()) whatStr +=  ": ";
+        whatStr += message;
+    }
     return whatStr.c_str();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jun  6 13:23:28 2008
@@ -197,7 +197,7 @@
             //then do other output as needed:
             return outputTasks.doOutput();
     }catch(ConnectionException& e){
-        close(e.code, e.what(), 0, 0);
+        close(e.code, e.getMessage(), 0, 0);
     }catch(std::exception& e){
         close(541/*internal error*/, e.what(), 0, 0);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp Fri Jun  6 13:23:28 2008
@@ -1,49 +1,40 @@
 #include "Bounds.h"
 
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Waitable.h"
 
 namespace qpid {
 namespace client {
 
-using sys::Monitor;
+using sys::Waitable;
 
 Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {}
 
-bool Bounds::expand(size_t sizeRequired, bool block) 
-{
-    if (max) {
-        Monitor::ScopedLock l(lock);
-        current += sizeRequired;
-        if (block) {
-            while (current > max) {
-                QPID_LOG(debug, "Waiting for bounds: " << *this); 
-                lock.wait();
-            }
-            QPID_LOG(debug, "Bounds ok: " << *this);
-        }
-        return current <= max;
-    } else {
-        return true;
+bool Bounds::expand(size_t sizeRequired, bool block) {
+    if (!max) return true;
+    Waitable::ScopedLock l(lock);
+    current += sizeRequired;
+    if (block) {
+        Waitable::ScopedWait w(lock);
+        while (current > max) 
+            lock.wait();
     }
+    return current <= max;
 }
 
-void Bounds::reduce(size_t size)
-{
+void Bounds::reduce(size_t size) {
     if (!max || size == 0) return;
-    Monitor::ScopedLock l(lock);
+    Waitable::ScopedLock l(lock);
     if (current == 0) return;
-    bool needNotify = current > max;
     current -= std::min(size, current);
-    if (needNotify && current < max) {
-        //todo: notify one at a time, but ensure that all threads are
-        //eventually notified
-        lock.notifyAll();
+    if (current < max && lock.hasWaiters()) {
+        assert(lock.hasWaiters() == 1);
+        lock.notify();
     }
 }
 
-size_t Bounds::getCurrentSize()
-{
-    Monitor::ScopedLock l(lock);
+size_t Bounds::getCurrentSize() {
+    Waitable::ScopedLock l(lock);
     return current;
 }
 
@@ -52,4 +43,10 @@
     return out;
 }
 
+void Bounds::setException(const sys::ExceptionHolder& e) {
+    Waitable::ScopedLock l(lock);    
+    lock.setException(e);
+    lock.waitWaiters();         // Wait for waiting threads to exit.
+}
+
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h Fri Jun  6 13:23:28 2008
@@ -20,7 +20,7 @@
  * under the License.
  *
  */
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
 
 namespace qpid{
 namespace client{
@@ -32,10 +32,11 @@
     bool expand(size_t, bool block);
     void reduce(size_t);
     size_t getCurrentSize();
-
+    void setException(const sys::ExceptionHolder&);
+    
   private:
     friend std::ostream& operator<<(std::ostream&, const Bounds&);
-    sys::Monitor lock;
+    sys::Waitable lock;
     const size_t max;
     size_t current;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Fri Jun  6 13:23:28 2008
@@ -83,11 +83,10 @@
 
 void ConnectionHandler::outgoing(AMQFrame& frame)
 {
-    if (getState() == OPEN) {
+    if (getState() == OPEN) 
         out(frame);
-    } else {
-        throw Exception("Connection is not open.");
-    }
+    else
+        throw Exception(errorText.empty() ? "Connection is not open." : errorText);
 }
 
 void ConnectionHandler::waitForOpen()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Jun  6 13:23:28 2008
@@ -119,41 +119,32 @@
     closed(NORMAL, "Closed by client");
 }
 
-// Set closed flags and erase the sessions map, but keep the contents
-// so sessions can be updated outside the lock.
-ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
+
+template <class F> void ConnectionImpl::closeInternal(const F& f) {
     isClosed = true;
     connector.close();
-    SessionVector save;
-    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+    for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) {
         boost::shared_ptr<SessionImpl> s = i->second.lock();
-        if (s) save.push_back(s);
+        if (s) f(s);
     }
     sessions.clear();
-    return save;
 }
 
-void ConnectionImpl::closed(uint16_t code, const std::string& text) 
-{ 
-    SessionVector save;
-    {
-        Mutex::ScopedLock l(lock);
-        save = closeInternal(l);
-    }
-    std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text));
+void ConnectionImpl::closed(uint16_t code, const std::string& text) { 
+    Mutex::ScopedLock l(lock);
+    setException(new ConnectionException(code, text));
+    closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
 }
 
 static const std::string CONN_CLOSED("Connection closed by broker");
 
-void ConnectionImpl::shutdown() 
-{
+void ConnectionImpl::shutdown() {
     Mutex::ScopedLock l(lock);
+    // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here.
+    setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED));
     if (isClosed) return;
-    SessionVector save(closeInternal(l));
     handler.fail(CONN_CLOSED);
-    Mutex::ScopedUnlock u(lock);
-    std::for_each(save.begin(), save.end(),
-                  boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
+    closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
 }
 
 void ConnectionImpl::erase(uint16_t ch) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Jun  6 13:23:28 2008
@@ -49,7 +49,6 @@
 
 {
     typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
-    typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector;
 
     SessionMap sessions; 
     ConnectionHandler handler;
@@ -59,9 +58,8 @@
     bool isClosed;
     bool isClosing;
 
-    template <class F> void detachAll(const F&);
+    template <class F> void closeInternal(const F&);
 
-    SessionVector closeInternal(const sys::Mutex::ScopedLock&);
     void incoming(framing::AMQFrame& frame);    
     void closed(uint16_t, const std::string&);
     void idleOut();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Fri Jun  6 13:23:28 2008
@@ -69,12 +69,14 @@
 }
 
 SessionImpl::~SessionImpl() {
-    Lock l(state);
-    if (state != DETACHED) {
-        QPID_LOG(warning, "Detaching deleted session");
-        setState(DETACHED);
-        handleClosed();
-        state.waitWaiters();
+    {
+        Lock l(state);
+        if (state != DETACHED) {
+            QPID_LOG(warning, "Detaching deleted session");
+            setState(DETACHED);
+            handleClosed();
+            state.waitWaiters();
+        }
     }
     connection->erase(channel);
 }

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h (from r664034, incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h&r1=664034&r2=664114&rev=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h Fri Jun  6 13:23:28 2008
@@ -23,9 +23,11 @@
  */
 
 #include "qpid/memory.h"
-#include <memory>
+#include <boost/shared_ptr.hpp>
+
 
 namespace qpid {
+namespace sys {
 
 struct Raisable {
     virtual ~Raisable() {};
@@ -40,14 +42,14 @@
 class ExceptionHolder : public Raisable {
   public:
     ExceptionHolder() {}
-    ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {}
+    // Use default copy & assign.
+    
     /** Take ownership of ex */
     template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); }
-    template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); }
+    template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); }
 
-    ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; }
     template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; }
-    template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; }
+    template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; }
         
     void raise() const { if (wrapper.get()) wrapper->raise() ; }
     std::string what() const { return wrapper->what(); }
@@ -60,14 +62,14 @@
         Wrapper(Ex* ptr) : exception(ptr) {}
         void raise() const { throw *exception; }
         std::string what() const { return exception->what(); }
-        std::auto_ptr<Ex> exception;
+        boost::shared_ptr<Ex> exception;
     };
     template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); }
-    std::auto_ptr<Raisable> wrapper;
-    
+    boost::shared_ptr<Raisable> wrapper;
 };
     
 
-} // namespace qpid
+}} // namespace qpid::sys
+
 
 #endif  /*!QPID_EXCEPTIONHOLDER_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h Fri Jun  6 13:23:28 2008
@@ -21,8 +21,8 @@
  *
  */
 
-#include "Monitor.h"
-
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ExceptionHolder.h"
 #include <assert.h>
 
 namespace qpid {
@@ -31,14 +31,18 @@
 /**
  * A monitor that keeps track of waiting threads.  Threads declare a
  * ScopedWait around wait() inside a ScopedLock to be considered
- * waiters.
+ * waiters. 
+ *
+ * Allows waiting threads to be interrupted by an exception. 
  */
 class Waitable : public Monitor {
   public:
     Waitable() : waiters(0) {}
 
+    ~Waitable() { assert(waiters == 0); }
+
     /** Use this inside a scoped lock around the
-     * call to Monitor::wait to be counted as a waiter
+     * call to wait() to be counted as a waiter.
      */
     struct ScopedWait {
         Waitable& w;
@@ -46,22 +50,55 @@
         ~ScopedWait() { if (--w.waiters==0) w.notifyAll(); }
     };
 
-    /** Block till there are no more ScopedWaits.
+    /** Block till there are no more waiters in ScopedWaits.
+     * waitWaiters() does not raise an exception even if waiters
+     * were interrupted by one.
      *@pre Must be called inside a ScopedLock but NOT a ScopedWait.
      */
     void waitWaiters() {
         while (waiters != 0) 
-            wait();
+            Monitor::wait();
     }
 
     /** Returns the number of outstanding ScopedWaits.
      * Must be called with the lock held.
      */
-    size_t hasWaiters() { return waiters; }
-    
+    size_t hasWaiters() const {
+        return waiters;
+    }
+
+    /** Set an execption to interrupt waiters in ScopedWait.
+     * Must be called with the lock held.
+     */
+    void setException(const ExceptionHolder& e) {
+        exception = e;
+        notifyAll();
+        
+    }
+
+    /** Throws an exception if one is set before or during the wait. */
+    void wait() {
+        ExCheck e(exception);
+        Monitor::wait();
+    }
+
+    /** Throws an exception if one is set before or during the wait. */
+    bool wait(const AbsTime& absoluteTime) {
+        ExCheck e(exception);
+        return Monitor::wait(absoluteTime);
+    }
+
+    ExceptionHolder exception;
+
   private:
-  friend struct ScopedWait;
+    struct ExCheck {
+        const ExceptionHolder& exception;
+        ExCheck(const ExceptionHolder& e) : exception(e) { e.raise(); }
+        ~ExCheck() { exception.raise(); }
+    };
+        
     size_t waiters;
+  friend struct ScopedWait;
 };
 
 }} // namespace qpid::sys