You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/06/06 22:23:28 UTC
svn commit: r664114 - in /incubator/qpid/trunk/qpid/cpp:
rubygen/framing.0-10/ src/ src/qpid/ src/qpid/broker/ src/qpid/client/
src/qpid/sys/
Author: aconway
Date: Fri Jun 6 13:23:28 2008
New Revision: 664114
URL: http://svn.apache.org/viewvc?rev=664114&view=rev
Log:
Added exceptions to sys::Waitable.
Fixed client side deadlock involving client::Bounds.
Fixed incorrect exception messages during connection shutdown.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h
- copied, changed from r664034, incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Fri Jun 6 13:23:28 2008
@@ -62,14 +62,14 @@
def reply_exceptions_h()
h_file("#{@dir}/reply_exceptions") {
include "qpid/Exception"
- include "qpid/ExceptionHolder"
+ include "qpid/sys/ExceptionHolder"
namespace(@namespace) {
define_exceptions_for("execution", "error-code", "SessionException")
define_exceptions_for("connection", "close-code", "ConnectionException")
define_exceptions_for("session", "detach-code", "ChannelException")
genl
genl "void throwExecutionException(int code, const std::string& text);"
- genl "void setExecutionException(ExceptionHolder& holder, int code, const std::string& text);"
+ genl "void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text);"
}
}
end
@@ -81,11 +81,11 @@
include "<assert.h>"
namespace("qpid::framing") {
scope("void throwExecutionException(int code, const std::string& text) {"){
- genl "ExceptionHolder h;"
+ genl "sys::ExceptionHolder h;"
genl "setExecutionException(h, code, text);"
genl "h.raise();"
}
- scope("void setExecutionException(ExceptionHolder& holder, int code, const std::string& text) {"){
+ scope("void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text) {"){
scope("switch (code) {") {
enum = @amqp.class_("execution").domain("error-code").enum
enum.choices.each { |c|
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun 6 13:23:28 2008
@@ -344,7 +344,7 @@
qpid/assert.h \
qpid/DataDir.h \
qpid/Exception.h \
- qpid/ExceptionHolder.h \
+ qpid/sys/ExceptionHolder.h \
qpid/amqp_0_10/Exception.h \
qpid/Msg.h \
qpid/Options.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Fri Jun 6 13:23:28 2008
@@ -34,18 +34,22 @@
}
Exception::Exception(const std::string& msg) throw() : message(msg) {
- QPID_LOG(debug, "Exception: " << message);
+ QPID_LOG(debug, "Exception constructed: " << message);
}
Exception::~Exception() throw() {}
-std::string Exception::getPrefix() const { return "Exception"; }
+std::string Exception::getPrefix() const { return ""; }
std::string Exception::getMessage() const { return message; }
const char* Exception::what() const throw() {
- if (whatStr.empty())
- whatStr = getPrefix() + ": " + message;
+ // Construct the what string the first time it is needed.
+ if (whatStr.empty()) {
+ whatStr = getPrefix();
+ if (!whatStr.empty()) whatStr += ": ";
+ whatStr += message;
+ }
return whatStr.c_str();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jun 6 13:23:28 2008
@@ -197,7 +197,7 @@
//then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
- close(e.code, e.what(), 0, 0);
+ close(e.code, e.getMessage(), 0, 0);
}catch(std::exception& e){
close(541/*internal error*/, e.what(), 0, 0);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.cpp Fri Jun 6 13:23:28 2008
@@ -1,49 +1,40 @@
#include "Bounds.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Waitable.h"
namespace qpid {
namespace client {
-using sys::Monitor;
+using sys::Waitable;
Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {}
-bool Bounds::expand(size_t sizeRequired, bool block)
-{
- if (max) {
- Monitor::ScopedLock l(lock);
- current += sizeRequired;
- if (block) {
- while (current > max) {
- QPID_LOG(debug, "Waiting for bounds: " << *this);
- lock.wait();
- }
- QPID_LOG(debug, "Bounds ok: " << *this);
- }
- return current <= max;
- } else {
- return true;
+bool Bounds::expand(size_t sizeRequired, bool block) {
+ if (!max) return true;
+ Waitable::ScopedLock l(lock);
+ current += sizeRequired;
+ if (block) {
+ Waitable::ScopedWait w(lock);
+ while (current > max)
+ lock.wait();
}
+ return current <= max;
}
-void Bounds::reduce(size_t size)
-{
+void Bounds::reduce(size_t size) {
if (!max || size == 0) return;
- Monitor::ScopedLock l(lock);
+ Waitable::ScopedLock l(lock);
if (current == 0) return;
- bool needNotify = current > max;
current -= std::min(size, current);
- if (needNotify && current < max) {
- //todo: notify one at a time, but ensure that all threads are
- //eventually notified
- lock.notifyAll();
+ if (current < max && lock.hasWaiters()) {
+ assert(lock.hasWaiters() == 1);
+ lock.notify();
}
}
-size_t Bounds::getCurrentSize()
-{
- Monitor::ScopedLock l(lock);
+size_t Bounds::getCurrentSize() {
+ Waitable::ScopedLock l(lock);
return current;
}
@@ -52,4 +43,10 @@
return out;
}
+void Bounds::setException(const sys::ExceptionHolder& e) {
+ Waitable::ScopedLock l(lock);
+ lock.setException(e);
+ lock.waitWaiters(); // Wait for waiting threads to exit.
+}
+
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Bounds.h Fri Jun 6 13:23:28 2008
@@ -20,7 +20,7 @@
* under the License.
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
namespace qpid{
namespace client{
@@ -32,10 +32,11 @@
bool expand(size_t, bool block);
void reduce(size_t);
size_t getCurrentSize();
-
+ void setException(const sys::ExceptionHolder&);
+
private:
friend std::ostream& operator<<(std::ostream&, const Bounds&);
- sys::Monitor lock;
+ sys::Waitable lock;
const size_t max;
size_t current;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Fri Jun 6 13:23:28 2008
@@ -83,11 +83,10 @@
void ConnectionHandler::outgoing(AMQFrame& frame)
{
- if (getState() == OPEN) {
+ if (getState() == OPEN)
out(frame);
- } else {
- throw Exception("Connection is not open.");
- }
+ else
+ throw Exception(errorText.empty() ? "Connection is not open." : errorText);
}
void ConnectionHandler::waitForOpen()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Jun 6 13:23:28 2008
@@ -119,41 +119,32 @@
closed(NORMAL, "Closed by client");
}
-// Set closed flags and erase the sessions map, but keep the contents
-// so sessions can be updated outside the lock.
-ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
+
+template <class F> void ConnectionImpl::closeInternal(const F& f) {
isClosed = true;
connector.close();
- SessionVector save;
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
- if (s) save.push_back(s);
+ if (s) f(s);
}
sessions.clear();
- return save;
}
-void ConnectionImpl::closed(uint16_t code, const std::string& text)
-{
- SessionVector save;
- {
- Mutex::ScopedLock l(lock);
- save = closeInternal(l);
- }
- std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text));
+void ConnectionImpl::closed(uint16_t code, const std::string& text) {
+ Mutex::ScopedLock l(lock);
+ setException(new ConnectionException(code, text));
+ closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
static const std::string CONN_CLOSED("Connection closed by broker");
-void ConnectionImpl::shutdown()
-{
+void ConnectionImpl::shutdown() {
Mutex::ScopedLock l(lock);
+ // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here.
+ setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED));
if (isClosed) return;
- SessionVector save(closeInternal(l));
handler.fail(CONN_CLOSED);
- Mutex::ScopedUnlock u(lock);
- std::for_each(save.begin(), save.end(),
- boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
+ closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Jun 6 13:23:28 2008
@@ -49,7 +49,6 @@
{
typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
- typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector;
SessionMap sessions;
ConnectionHandler handler;
@@ -59,9 +58,8 @@
bool isClosed;
bool isClosing;
- template <class F> void detachAll(const F&);
+ template <class F> void closeInternal(const F&);
- SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
void idleOut();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Fri Jun 6 13:23:28 2008
@@ -69,12 +69,14 @@
}
SessionImpl::~SessionImpl() {
- Lock l(state);
- if (state != DETACHED) {
- QPID_LOG(warning, "Detaching deleted session");
- setState(DETACHED);
- handleClosed();
- state.waitWaiters();
+ {
+ Lock l(state);
+ if (state != DETACHED) {
+ QPID_LOG(warning, "Detaching deleted session");
+ setState(DETACHED);
+ handleClosed();
+ state.waitWaiters();
+ }
}
connection->erase(channel);
}
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h (from r664034, incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h&r1=664034&r2=664114&rev=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/ExceptionHolder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ExceptionHolder.h Fri Jun 6 13:23:28 2008
@@ -23,9 +23,11 @@
*/
#include "qpid/memory.h"
-#include <memory>
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
+namespace sys {
struct Raisable {
virtual ~Raisable() {};
@@ -40,14 +42,14 @@
class ExceptionHolder : public Raisable {
public:
ExceptionHolder() {}
- ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {}
+ // Use default copy & assign.
+
/** Take ownership of ex */
template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); }
- template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); }
+ template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); }
- ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; }
template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; }
- template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; }
+ template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; }
void raise() const { if (wrapper.get()) wrapper->raise() ; }
std::string what() const { return wrapper->what(); }
@@ -60,14 +62,14 @@
Wrapper(Ex* ptr) : exception(ptr) {}
void raise() const { throw *exception; }
std::string what() const { return exception->what(); }
- std::auto_ptr<Ex> exception;
+ boost::shared_ptr<Ex> exception;
};
template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); }
- std::auto_ptr<Raisable> wrapper;
-
+ boost::shared_ptr<Raisable> wrapper;
};
-} // namespace qpid
+}} // namespace qpid::sys
+
#endif /*!QPID_EXCEPTIONHOLDER_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h?rev=664114&r1=664113&r2=664114&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h Fri Jun 6 13:23:28 2008
@@ -21,8 +21,8 @@
*
*/
-#include "Monitor.h"
-
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ExceptionHolder.h"
#include <assert.h>
namespace qpid {
@@ -31,14 +31,18 @@
/**
* A monitor that keeps track of waiting threads. Threads declare a
* ScopedWait around wait() inside a ScopedLock to be considered
- * waiters.
+ * waiters.
+ *
+ * Allows waiting threads to be interrupted by an exception.
*/
class Waitable : public Monitor {
public:
Waitable() : waiters(0) {}
+ ~Waitable() { assert(waiters == 0); }
+
/** Use this inside a scoped lock around the
- * call to Monitor::wait to be counted as a waiter
+ * call to wait() to be counted as a waiter.
*/
struct ScopedWait {
Waitable& w;
@@ -46,22 +50,55 @@
~ScopedWait() { if (--w.waiters==0) w.notifyAll(); }
};
- /** Block till there are no more ScopedWaits.
+ /** Block till there are no more waiters in ScopedWaits.
+ * waitWaiters() does not raise an exception even if waiters
+ * were interrupted by one.
*@pre Must be called inside a ScopedLock but NOT a ScopedWait.
*/
void waitWaiters() {
while (waiters != 0)
- wait();
+ Monitor::wait();
}
/** Returns the number of outstanding ScopedWaits.
* Must be called with the lock held.
*/
- size_t hasWaiters() { return waiters; }
-
+ size_t hasWaiters() const {
+ return waiters;
+ }
+
+ /** Set an execption to interrupt waiters in ScopedWait.
+ * Must be called with the lock held.
+ */
+ void setException(const ExceptionHolder& e) {
+ exception = e;
+ notifyAll();
+
+ }
+
+ /** Throws an exception if one is set before or during the wait. */
+ void wait() {
+ ExCheck e(exception);
+ Monitor::wait();
+ }
+
+ /** Throws an exception if one is set before or during the wait. */
+ bool wait(const AbsTime& absoluteTime) {
+ ExCheck e(exception);
+ return Monitor::wait(absoluteTime);
+ }
+
+ ExceptionHolder exception;
+
private:
- friend struct ScopedWait;
+ struct ExCheck {
+ const ExceptionHolder& exception;
+ ExCheck(const ExceptionHolder& e) : exception(e) { e.raise(); }
+ ~ExCheck() { exception.raise(); }
+ };
+
size_t waiters;
+ friend struct ScopedWait;
};
}} // namespace qpid::sys