You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/04/11 16:29:07 UTC

svn commit: r764204 - in /qpid/trunk/qpid/cpp: src/ src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Sat Apr 11 14:29:04 2009
New Revision: 764204

URL: http://svn.apache.org/viewvc?rev=764204&view=rev
Log:

Fix issues when cluster is run with persistence enabled.

- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue.
- Enable persistence in cluster tests.
- Correct message status in DeliveryRecord updates.
- Remove qpid.update queue when update complete - avoid it becoming persistent

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h   (with props)
    qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp   (with props)
    qpid/trunk/qpid/cpp/src/tests/test_store.cpp   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
    qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
    qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/cluster.mk
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
    qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
    qpid/trunk/qpid/cpp/src/tests/run_failover_soak
    qpid/trunk/qpid/cpp/src/tests/start_cluster
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Sat Apr 11 14:29:04 2009
@@ -56,6 +56,8 @@
   qpid/cluster/Dispatchable.h			\
   qpid/cluster/UpdateClient.cpp			\
   qpid/cluster/UpdateClient.h			\
+  qpid/cluster/ErrorCheck.cpp			\
+  qpid/cluster/ErrorCheck.h			\
   qpid/cluster/Event.cpp			\
   qpid/cluster/Event.h				\
   qpid/cluster/EventFrame.h			\
@@ -70,6 +72,7 @@
   qpid/cluster/Multicaster.h			\
   qpid/cluster/McastFrameHandler.h		\
   qpid/cluster/NoOpConnectionOutputHandler.h	\
+  qpid/cluster/StallConnectionOutputHandler.h	\
   qpid/cluster/OutputInterceptor.cpp		\
   qpid/cluster/OutputInterceptor.h		\
   qpid/cluster/PollerDispatch.cpp		\

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -88,6 +88,7 @@
     }
     catch(const SessionException& e) {
         QPID_LOG(error, "Execution exception: " << e.what());
+        executionException(e.code, e.what()); // Let subclass handle this first.
         framing::AMQP_AllProxy::Execution  execution(channel);
         AMQMethodBody* m = f.getMethod();
         SequenceNumber commandId;
@@ -98,6 +99,7 @@
     }
     catch(const ChannelException& e){
         QPID_LOG(error, "Channel exception: " << e.what());
+        channelException(e.code, e.what()); // Let subclass handle this first.
         peer.detached(name, e.code);
     }
     catch(const ConnectionException& e) {

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Sat Apr 11 14:29:04 2009
@@ -87,8 +87,9 @@
     QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m);
 
     virtual void setState(const std::string& sessionName, bool force) = 0;
-    virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
     virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
+    virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0;
+    virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0;
     virtual void detaching() = 0;
 
     // Notification of events

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Sat Apr 11 14:29:04 2009
@@ -57,7 +57,8 @@
     mgmtObject(0),
     links(broker_.getLinks()),
     agent(0),
-    timer(broker_.getTimer())
+    timer(broker_.getTimer()),
+    errorListener(0)
 {
     Manageable* parent = broker.GetVhostObject();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Sat Apr 11 14:29:04 2009
@@ -66,6 +66,17 @@
                    public RefCounted
 {
   public:
+    /**
+     * Listener that can be registered with a Connection to be informed of errors.
+     */
+    class ErrorListener
+    {
+      public:
+        virtual ~ErrorListener() {}
+        virtual void sessionError(uint16_t channel, const std::string&) = 0;
+        virtual void connectionError(const std::string&) = 0;
+    };
+
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
     ~Connection ();
 
@@ -101,6 +112,9 @@
     const std::string& getMgmtId() const { return mgmtId; }
     management::ManagementAgent* getAgent() const { return agent; }
     void setFederationLink(bool b);
+    /** Connection does not delete the listener. 0 resets. */
+    void setErrorListener(ErrorListener* l) { errorListener=l; }
+    ErrorListener* getErrorListener() { return errorListener; }
     
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
@@ -112,6 +126,7 @@
 
     void sendClose();
     void setSecureConnection(SecureConnection* secured);
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@
     management::ManagementAgent* agent;
     Timer& timer;
     boost::intrusive_ptr<TimerTask> heartbeatTimer;
+    ErrorListener* errorListener;
+
   public:
     qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -64,13 +64,16 @@
 void ConnectionHandler::handle(framing::AMQFrame& frame)
 {
     AMQMethodBody* method=frame.getBody()->getMethod();
+    Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
     try{
         if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
             handler->connection.getChannel(frame.getChannel()).in(frame);
         }
     }catch(ConnectionException& e){
+        if (errorListener) errorListener->connectionError(e.what());
         handler->proxy.close(e.code, e.what());
     }catch(std::exception& e){
+        if (errorListener) errorListener->connectionError(e.what());
         handler->proxy.close(541/*internal error*/, e.what());
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -45,14 +45,20 @@
 MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
 } // namespace
 
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
-    handleDetach();
-}
-
 void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+    // NOTE: must tell the error listener _before_ calling connection.close()
+    if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
     connection.close(code, msg);
 }
 
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
 ConnectionState& SessionHandler::getConnection() { return connection; }
 
 const ConnectionState& SessionHandler::getConnection() const { return connection; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Sat Apr 11 14:29:04 2009
@@ -73,8 +73,9 @@
     virtual void setState(const std::string& sessionName, bool force);
     virtual qpid::SessionState* getState();
     virtual framing::FrameHandler* getInHandler();
-    virtual void channelException(framing::session::DetachCode code, const std::string& msg);
     virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+    virtual void channelException(framing::session::DetachCode, const std::string& msg);
+    virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
     virtual void detaching();
     virtual void readyToSend();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Apr 11 14:29:04 2009
@@ -36,6 +36,7 @@
 #include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -63,6 +64,7 @@
 using namespace qpid::sys;
 using namespace std;
 using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
 using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
@@ -77,9 +79,10 @@
 
     void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
-    void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+    void configChange(const std::string& current) { cluster.configChange(member, current, l); }
     void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+    void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +115,8 @@
     discarding(true),
     state(INIT),
     lastSize(0),
-    lastBroker(false)
+    lastBroker(false),
+    error(*this)
 {
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
@@ -195,14 +199,19 @@
     leave(l);
 }
 
+#define LEAVE_TRY(STMT) try { STMT; } \
+    catch (const std::exception& e) { \
+        QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+    } do {} while(0)
+
 void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        try { broker.shutdown(); }
-        catch (const std::exception& e) {
-            QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
-        }
+        // Finalize connections now now to avoid problems later in destructor.
+        LEAVE_TRY(localConnections.clear());
+        LEAVE_TRY(connections.clear());
+        LEAVE_TRY(broker.shutdown());
     }
 }
 
@@ -254,10 +263,22 @@
         QPID_LOG(trace, *this << " DROP: " << e);
 }
 
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+    Mutex::ScopedLock l(lock);
+    error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& e) {
     Mutex::ScopedLock l(lock);
+    // Process each frame through the error checker.
+    error.delivered(e);
+    while (error.canProcess())  // There is a frame ready to process.
+        processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
         QPID_LOG(trace, *this << " DLVR: " << e);
         ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +286,8 @@
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
     else if (state >= CATCHUP) {
-        QPID_LOG(trace, *this << " DLVR:  " << e);
+        map.incrementFrameSeq();
+        QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ":  " << e);
         ConnectionPtr connection = getConnection(e.connectionId, l);
         if (connection)
             connection->deliveredFrame(e);
@@ -357,8 +379,8 @@
     broker.getQueueEvents().enable();
 }
 
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
-    bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+    bool memberChange = map.configChange(current);
     if (state == LEFT) return;
     
     if (!map.isAlive(self)) {  // Final config change.
@@ -600,8 +622,13 @@
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
-    static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
-    return o << cluster.self << "(" << STATE[cluster.state] << ")";
+    static const char* STATE[] = {
+        "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+    };
+    assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+    o << cluster.self << "(" << STATE[cluster.state];
+    if (cluster.error.isUnresolved()) o << "/error";
+    return o << ")";
 }
 
 MemberId Cluster::getId() const {
@@ -635,4 +662,13 @@
     expiryPolicy->deliverExpire(id);
 }
 
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+    // If we receive an errorCheck here, it's because we  have processed past the point
+    // of the error so respond with ERROR_TYPE_NONE
+    assert(map.getFrameSeq() >= frameSeq);
+    if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+        mcast.mcastControl(
+            ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
 #include "ClusterSettings.h"
 #include "Cpg.h"
 #include "Decoder.h"
+#include "ErrorCheck.h"
 #include "Event.h"
 #include "EventFrame.h"
 #include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@
 
     void deliverFrame(const EventFrame&);
 
+    // Called in deliverFrame thread to indicate an error from the broker.
+    void flagError(Connection&, ErrorCheck::ErrorType);
+    void connectionError();
+
     // Called only during update by Connection::shadowReady
     Decoder& getDecoder() { return decoder; }
 
@@ -132,13 +137,15 @@
 
     // == Called in deliverFrameQueue thread
     void deliveredFrame(const EventFrame&); 
+    void processFrame(const EventFrame&, Lock&); 
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
-    void configChange(const MemberId&, const std::string& addresses, Lock& l);
+    void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
+    void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
     void shutdown(const MemberId&, Lock&);
 
     // Helper functions
@@ -216,11 +223,13 @@
     Decoder decoder;
     bool discarding;
     
+
     // Remaining members are protected by lock.
-    // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+    // TODO aconway 2009-03-06: Most of these members are also only used in
     // deliverFrameQueue thread or during stall. Review and separate members
     // that require a lock, drop lock when not needed.
-    // 
+
     mutable sys::Monitor lock;
 
 
@@ -243,7 +252,7 @@
     bool lastBroker;
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;
-
+    ErrorCheck error;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Sat Apr 11 14:29:04 2009
@@ -33,6 +33,13 @@
 
 namespace cluster {
 
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+    Set set;
+    for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)  
+        set.insert(MemberId(std::string(i, i+8)));
+    return set;
+}
+
 namespace {
 
 void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@
 
 }
 
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
 
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
     alive.insert(id);
     if (isMember)
         members[id] = url;
@@ -64,7 +71,9 @@
         joiners[id] = url;
 }
 
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+  : frameSeq(frameSeq_)
+{
     std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
     std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
 }
@@ -78,22 +87,7 @@
     }
     b.getMembers().clear();
     std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
-    cpg_address *current, int nCurrent,
-    cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int /*nJoined*/)
-{
-    cpg_address* a;
-    bool memberChange=false;
-    for (a = left; a != left+nLeft; ++a) {
-        memberChange = memberChange || members.erase(*a);
-        joiners.erase(*a);
-    }
-    alive.clear();
-    std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
-    return memberChange;
+    b.setFrameSeq(frameSeq);
 }
 
 Url ClusterMap::getUrl(const Map& map, const  MemberId& id) {
@@ -123,8 +117,13 @@
     return urls;
 }
 
-ClusterMap::Set ClusterMap::getAlive() const {
-    return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+    Set s;
+    std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+                   boost::bind(&Map::value_type::first, _1));
+    return s;
 }
 
 std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@
 
 bool ClusterMap::configChange(const std::string& addresses) {
     bool memberChange = false;
-    Set update;
+    Set update = decode(addresses);
     for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)  
         update.insert(MemberId(std::string(i, i+8)));
     Set removed;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Sat Apr 11 14:29:04 2009
@@ -38,26 +38,26 @@
 namespace qpid {
 namespace cluster {
 
+typedef std::set<MemberId> MemberSet;
+
 /**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
  */
 class ClusterMap {
   public:
     typedef std::map<MemberId, Url> Map;
     typedef std::set<MemberId> Set;
 
+    static Set decode(const std::string&);
+        
     ClusterMap();
     ClusterMap(const MemberId& id, const Url& url, bool isReady);
-    ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
 
     /** Update from config change.
      *@return true if member set changed.
      */
-    bool configChange(
-        cpg_address *current, int nCurrent,
-        cpg_address *left, int nLeft,
-        cpg_address *joined, int nJoined);
-
     bool configChange(const std::string& addresses);
 
     bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@
     std::vector<std::string> memberIds() const;
     std::vector<Url> memberUrls() const;
     Set getAlive() const;
+    Set getMembers() const;
 
     bool updateRequest(const MemberId& id, const std::string& url);       
     /** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@
      * Utility method to return intersection of two member sets
      */
     static Set intersection(const Set& a, const Set& b);
+
+    uint64_t getFrameSeq() { return frameSeq; }
+    uint64_t incrementFrameSeq() { return ++frameSeq; }
+    
   private:
     Url getUrl(const Map& map, const  MemberId& id);
     
     Map joiners, members;
     Set alive;
+    uint64_t frameSeq;
 
   friend std::ostream& operator<<(std::ostream&, const Map&);
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Sat Apr 11 14:29:04 2009
@@ -56,8 +56,16 @@
 namespace cluster {
 
 using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+    void handle(framing::AMQFrame&) {}
+};
 
-NoOpConnectionOutputHandler Connection::discardHandler;
 
 namespace {
 sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +97,8 @@
         connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
         connection.setClientThrottling(false);              // Disable client throttling, done by active node.
     }
+    if (!isCatchUp())
+        connection.setErrorListener(this);
 }
 
 void Connection::giveReadCredit(int credit) {
@@ -97,6 +107,7 @@
 }
 
 Connection::~Connection() {
+    connection.setErrorListener(0);
     QPID_LOG(debug, cluster << " deleted connection: " << *this);
 }
 
@@ -126,7 +137,7 @@
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection.getOutput().send(ok);
-            output.closeOutput(discardHandler);
+            output.closeOutput();
             catchUp = false;
         }
         else
@@ -156,8 +167,8 @@
     {
         if (f.type == DATA) // incoming data frames to broker::Connection
             connection.received(const_cast<AMQFrame&>(f.frame)); 
-        else {                    // frame control, send frame via SessionState
-            broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+        else {           // frame control, send frame via SessionState
+            broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
             if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
         }
     }
@@ -180,7 +191,7 @@
             // This was a local replicated connection. Multicast a deliver
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
-            output.closeOutput(discardHandler);
+            output.closeOutput();
             cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
         }
     }
@@ -275,13 +286,14 @@
     QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
     self = shadowId;
     connection.setUserId(username);
-    // OK to use decoder here because we are stalled for update.
+    // OK to use decoder here because cluster is stalled for update.
     cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+    connection.setErrorListener(this);
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
-    cluster.updateInDone(ClusterMap(joiners, members));
+    cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
     self.second = 0;        // Mark this as completed update connection.
 }
 
@@ -305,7 +317,9 @@
 }
 
 broker::QueuedMessage Connection::getUpdateMessage() {
-    broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+    shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+    assert(!updateq->isDurable());
+    broker::QueuedMessage m = updateq->get();
     if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
     return m;
 }
@@ -342,15 +356,15 @@
 
     // If the message was unacked, the newbie broker must place
     // it in its messageStore.
-    if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+    if ( m.payload && m.payload->isPersistent() && acquired && !ended)
         queue->enqueue ( 0, m.payload );
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
-    shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
-    if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
-    q->setPosition(position);
-}
+        shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+        if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+        q->setPosition(position);
+    }
 
 void Connection::expiryId(uint64_t id) {
     cluster.getExpiryPolicy().setId(id);
@@ -407,7 +421,14 @@
     QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
 }
 
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+    cluster.flagError(*this, ERROR_TYPE_SESSION);
+    
+}
+
+void Connection::connectionError(const std::string& ) {
+    cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
 
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Sat Apr 11 14:29:04 2009
@@ -25,7 +25,6 @@
 #include "types.h"
 #include "WriteEstimate.h"
 #include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
 #include "EventFrame.h"
 #include "McastFrameHandler.h"
 
@@ -58,7 +57,8 @@
 class Connection :
         public RefCounted,
         public sys::ConnectionInputHandler,
-        public framing::AMQP_AllOperations::ClusterConnectionHandler
+        public framing::AMQP_AllOperations::ClusterConnectionHandler,
+        private broker::Connection::ErrorListener
         
 {
   public:
@@ -120,7 +120,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
@@ -156,6 +156,13 @@
         void handle(framing::AMQFrame&) {}
     };
     
+
+    static NullFrameHandler nullFrameHandler;
+
+    // Error listener functions
+    void connectionError(const std::string&);
+    void sessionError(uint16_t channel, const std::string&);
+    
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
@@ -167,8 +174,6 @@
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
 
-    static NoOpConnectionOutputHandler discardHandler;
-
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
@@ -181,7 +186,6 @@
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
-    NullFrameHandler nullFrameHandler;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
     

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+    : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+    copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+    return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+    // Detected a local error, inform cluster and set error state.
+    assert(t != ERROR_TYPE_NONE); // Must be an error.
+    assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+    type = t;
+    unresolved = ms;
+    frameSeq = seq;
+    connection = &c;
+    QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+             << " error " << frameSeq << " unresolved: " << unresolved);
+    mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+    if (isUnresolved()) {
+        const ClusterErrorCheckBody* errorCheck =
+            dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+        const ClusterConfigChangeBody* configChange =
+            dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+        if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+            if (errorCheck->getType() < type) { // my error is worse than his
+                QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+                throw Exception("Aborted by local failure that did not occur on all replicas");
+            }
+            else {              // his error is worse/same as mine.
+                QPID_LOG(critical, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+                unresolved.erase(e.getMemberId());
+                checkResolved();
+            }
+        }
+        else {
+            frames.push_back(e); // Only drop matching errorCheck controls.
+            if (configChange) {
+                MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+                MemberSet result;
+                set_intersection(members.begin(), members.end(),
+                                 unresolved.begin(), unresolved.end(),
+                                 inserter(result, result.begin()));
+                unresolved.swap(result);
+                checkResolved();
+            }
+        }
+    }
+    else 
+        frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+    if (unresolved.empty()) {   // No more potentially conflicted members, we're clear.
+        type = ERROR_TYPE_NONE;
+        QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+    }
+    else 
+        QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+    assert(canProcess());
+    EventFrame e(frames.front());
+    frames.pop_front();
+    return e;
+}
+
+bool ErrorCheck::canProcess() const {
+    return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+    return type != ERROR_TYPE_NONE;
+}
+    
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Sat Apr 11 14:29:04 2009
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ * 
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+  public:
+    typedef std::set<MemberId> MemberSet;
+    typedef framing::cluster::ErrorType ErrorType;
+    
+    ErrorCheck(Cluster&);
+
+    /** A local error has occured */
+    void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+    /** Called when a frame is delivered */
+    void delivered(const EventFrame&);
+
+    EventFrame getNext();
+
+    bool canProcess() const;
+    bool isUnresolved() const;
+    
+  private:
+    void checkResolved();
+    
+    Cluster& cluster;
+    Multicaster& mcast;
+    std::deque<EventFrame> frames;
+    std::set<MemberId> unresolved;
+    uint64_t frameSeq;
+    ErrorType type;
+    Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_ERRORCHECK_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Sat Apr 11 14:29:04 2009
@@ -45,6 +45,7 @@
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
     bool isLastInEvent() const { return readCredit; }
+    MemberId getMemberId() const { return connectionId.getMember(); }
 
 
     ConnectionId connectionId;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h Sat Apr 11 14:29:04 2009
@@ -52,6 +52,8 @@
             return 0;
     }
 
+    void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
   private:
     typedef std::map<ConnectionId, ConnectionPtr> Map;
     mutable sys::Mutex lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h Sat Apr 11 14:29:04 2009
@@ -30,8 +30,7 @@
 namespace cluster {
 
 /**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
  */
 class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Sat Apr 11 14:29:04 2009
@@ -32,8 +32,9 @@
 
 using namespace framing;
 
-OutputInterceptor::OutputInterceptor(
-    cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
     : parent(p), closing(false), next(&h), sent(),
       writeEstimate(p.getCluster().getWriteEstimate()),
       moreOutput(), doingOutput()
@@ -111,10 +112,10 @@
     QPID_LOG(trace, parent << "Send doOutput request for " << request);
 }
 
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
     sys::Mutex::ScopedLock l(lock);
     closing = true;
-    next = &h;
+    next = &discardHandler;
 }
 
 void OutputInterceptor::close() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
  */
 
 #include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/broker/ConnectionFactory.h"
 #include "qpid/sys/LatencyMetric.h"
@@ -53,7 +54,7 @@
     // Intercept doOutput requests on Connection.
     bool doOutput();
 
-    void closeOutput(sys::ConnectionOutputHandler& h);
+    void closeOutput();
 
     cluster::Connection& parent;
     
@@ -70,6 +71,7 @@
     WriteEstimate writeEstimate;
     bool moreOutput;
     bool doingOutput;
+    static NoOpConnectionOutputHandler discardHandler;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sat Apr 11 14:29:04 2009
@@ -125,15 +125,19 @@
     // Update queue is used to transfer acquired messages that are no longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
-    session.close();
 
     std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
 
+    session.queueDelete(arg::queue=UPDATE);
+    session.close();
+
+
     ClusterConnectionProxy(session).expiryId(expiry.getId());
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
+
     connection.close();
     QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " << updateeUrl);
 }
@@ -202,7 +206,6 @@
         sb.get()->send(transfer, message.payload->getFrames());
         if (message.payload->isContentReleased()){
             uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
             uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
             bool morecontent = true;
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)

Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Sat Apr 11 14:29:04 2009
@@ -114,10 +114,12 @@
     SessionType session;
     qpid::client::SubscriptionManager subs;
     qpid::client::LocalQueue lq;
-    ClientT(uint16_t port, const std::string& name=std::string())
-        : connection(port), session(connection.newSession(name)), subs(session) {}
-    ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
-        : connection(settings), session(connection.newSession(name)), subs(session) {}
+    std::string name;
+
+    ClientT(uint16_t port, const std::string& name_=std::string())
+        : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
+    ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
+        : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
 
     ~ClientT() { connection.close(); }
 };

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Sat Apr 11 14:29:04 2009
@@ -67,16 +67,23 @@
     add(n);
 }
 
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_)
+    : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_)
+{
+    add(n);
+}
+
 const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
     list_of<string>("--auth=no")("--no-data-dir");
 
-ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
+ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
     Args args = list_of<string>("qpidd " __FILE__)
         ("--no-module-dir")
         ("--load-module=../.libs/cluster.so")
         ("--cluster-name")(name) 
         ("--log-prefix")(prefix);
     args.insert(args.end(), userArgs.begin(), userArgs.end());
+    if (updateArgs) updateArgs(args, index);
     return args;
 }
 
@@ -84,7 +91,7 @@
     if (size() != size_t(localIndex))  { // fork a broker process.
         std::ostringstream os; os << "fork" << size();
         std::string prefix = os.str();
-        forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix))));
+        forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
         push_back(forkedBrokers.back()->getPort());
     }
     else {                      // Run in this process
@@ -106,7 +113,7 @@
     assert(int(size()) == localIndex);
     ostringstream os; os << "local" << localIndex;
     string prefix = os.str();
-    Args args(makeArgs(prefix));
+    Args args(makeArgs(prefix, localIndex));
     vector<const char*> argv(args.size());
     transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
     qpid::log::Logger::instance().setPrefix(prefix);
@@ -131,3 +138,22 @@
     kill(n,sig);
     try { c.close(); } catch(...) {}
 }
+
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
+    std::vector<qpid::Url> urls = source.getKnownBrokers();
+    if (n >= 0 && unsigned(n) != urls.size()) {
+        // Retry up to 10 secs in .1 second intervals.
+        for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+            qpid::sys::usleep(1000*100); // 0.1 secs
+            urls = source.getKnownBrokers();
+        }
+    }
+    std::set<int> s;
+    for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) 
+        s.insert((*i)[0].get<qpid::TcpAddress>()->port);
+    return s;
+}

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h Sat Apr 11 14:29:04 2009
@@ -38,6 +38,7 @@
 #include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
+#include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
 
 #include <string>
@@ -69,33 +70,44 @@
 class ClusterFixture : public vector<uint16_t>  {
   public:
     typedef std::vector<std::string> Args;
+    static const Args DEFAULT_ARGS; 
+
     /** @param localIndex can be -1 meaning don't automatically start a local broker.
      * A local broker can be started with addLocal().
      */
     ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
+
+    /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
+    ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs);
+
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();                 // Add a broker.
     void setup();
 
     bool hasLocal() const;
     
-    /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+    /** Kill a forked broker with sig, or shutdown localBroker. */
     void kill(size_t n, int sig=SIGINT);
 
     /** Kill a broker and suppressing errors from closing connection c. */
     void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
 
   private:
-    static const Args DEFAULT_ARGS;
     
     void addLocal();            // Add a local broker.
-    Args makeArgs(const std::string& prefix);
+    Args makeArgs(const std::string& prefix, size_t index);
     string name;
     std::auto_ptr<BrokerFixture> localBroker;
     int localIndex;
     std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
     Args userArgs;
+    boost::function<void (Args&, size_t)> updateArgs;
 };
 
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1);
 
 #endif  /*!CLUSTER_FIXTURE_H*/

Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp Sat Apr 11 14:29:04 2009
@@ -20,12 +20,17 @@
  */
 
 #include "ForkedBroker.h"
+#include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
+#include <boost/algorithm/string.hpp>
 #include <algorithm>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <signal.h>
 
+using namespace std;
+using qpid::ErrnoException;
+
 ForkedBroker::ForkedBroker(const Args& args) { init(args); }
 
 ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); }
@@ -42,14 +47,25 @@
     pid = 0;                // Reset pid here in case of an exception.
     using qpid::ErrnoException;
     if (::kill(savePid, sig) < 0) 
-        throw ErrnoException("kill failed");
+            throw ErrnoException("kill failed");
     int status;
     if (::waitpid(savePid, &status, 0) < 0 && sig != 9) 
         throw ErrnoException("wait for forked process failed");
     if (WEXITSTATUS(status) != 0 && sig != 9) 
         throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
 }
+        
+namespace std {
+static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
+    copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
+    return o;
+}
 
+bool isLogOption(const std::string& s) {
+    return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace");
+}
+
+}
         
 void ForkedBroker::init(const Args& userArgs) {
     using qpid::ErrnoException;
@@ -70,17 +86,19 @@
     }
     else {                  // child
         ::close(pipeFds[0]);
-        // FIXME aconway 2009-02-12: 
         int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
         if (fd < 0) throw ErrnoException("dup2 failed");
         const char* prog = "../qpidd";
         Args args(userArgs);
         args.push_back("--port=0");
-        if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
-            args.push_back("--log-enable=error+"); // Keep quiet except for errors.
+        // Keep quiet except for errors.
+        if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
+            && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
+            args.push_back("--log-enable=error+"); 
         std::vector<const char*> argv(args.size());
         std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
         argv.push_back(0);
+        QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
         execv(prog, const_cast<char* const*>(&argv[0]));
         throw ErrnoException("execv failed");
     }

Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Sat Apr 11 14:29:04 2009
@@ -53,6 +53,7 @@
     ~ForkedBroker();
 
     void kill(int sig=SIGINT);
+    int wait();                 // Wait for exit, return exit status.
     uint16_t getPort() { return port; }
     pid_t getPID() { return pid; }
 

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Sat Apr 11 14:29:04 2009
@@ -110,11 +110,17 @@
 # 	amqp_0_10/Map.cpp \
 # 	amqp_0_10/handlers.cpp 
 
+TESTLIBFLAGS = -module -rpath $(abs_builddir)
 
 check_LTLIBRARIES += libshlibtest.la
-libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
+libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS)
 libshlibtest_la_SOURCES = shlibtest.cpp
 
+check_LTLIBRARIES += test_store.la
+test_store_la_SOURCES = test_store.cpp
+test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LDFLAGS = $(TESTLIBFLAGS)
+
 include cluster.mk
 if SSL
 include ssl.mk

Added: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
+
+    using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
+
+void updateArgs(ClusterFixture::Args& args, size_t index) {
+    ostringstream os;
+    os << "--test-store-name=s" << index;
+    args.push_back(os.str());
+    args.push_back("--load-module=.libs/test_store.so");
+    string dataDir("/tmp/PartialFailure.XXXXXX");
+    if (!mkdtemp(const_cast<char*>(dataDir.c_str())))
+        throw ErrnoException("Can't create data dir");
+    args.push_back("--data-dir="+dataDir);
+    args.push_back("--auth=no");
+
+    // These tests generate errors deliberately, disable error logging unless a log env var is set.
+    if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) {
+        remove_if(args.begin(), args.end(), isLogOption);
+        args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs.
+    }
+}
+
+Message pMessage(string data, string q) {
+    Message msg(data, q);
+    msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+    return msg;
+}
+
+void queueAndSub(Client& c) {
+    c.session.queueDeclare(c.name, durable=true);
+    c.subs.subscribe(c.lq, c.name);
+}
+
+// Verify normal cluster-wide errors.
+QPID_AUTO_TEST_CASE(testNormalErrors) {
+    // FIXME aconway 2009-04-10: Would like to put a scope just around
+    // the statements expected to fail (in BOOST_CHECK_THROW) but that
+    // sproadically lets out messages, possibly because they're in
+    // Connection thread.
+    ScopedSuppressLogging allQuiet; 
+
+    ClusterFixture cluster(3, -1, updateArgs);    
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+
+    queueAndSub(c0);
+    c0.session.messageTransfer(content=Message("x", "c0"));
+    BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+    // Session error.
+    BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+    c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+    // Connection error, kill c1 on all members.
+    queueAndSub(c1);
+    BOOST_CHECK_THROW(
+        c1.session.messageTransfer(
+            content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+        ConnectionException);
+    c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+    BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+    BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+    BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+}
+
+
+// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
+QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(1, -1, updateArgs);
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Kill the new guy
+    cluster.add();
+    Client c1(cluster[1]);
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+    BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+    // Kill the old guy
+    cluster.add();
+    Client c2(cluster[2]);
+    c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+    BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+}
+
+// Test that if one member fails and  others do not, the failure leaves the cluster. 
+QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(3, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+    
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+    // Cause partial failure on c1
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("b", "q"));
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+    // Cause partial failure on c2
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("c", "q"));
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+}
+
+// Test multiple partial falures: 2 fail 2 pass 
+QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(4, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+    Client c3(cluster[3], "c3");
+    
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Cause partial failure on c1, c2
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("b", "q"));
+    c3.session.messageTransfer(content=pMessage("c", "q"));
+    BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+}
+
+/** FIXME aconway 2009-04-10:
+ * The current approach to shutting down a process in test_store
+ * sometimes leads to assertion failures and errors in the shut-down
+ * process. Need a cleaner solution
+ */
+#if 0
+QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(2, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Cause failure on member 0 and simultaneous crash on member 1.
+    BOOST_CHECK_THROW(
+        c0.session.messageTransfer(
+            content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+        ConnectionException);
+    cluster.wait(1);
+
+    Client c00(cluster[0], "c00"); // Old connection is dead.
+    BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+}
+#endif
+
+
+QPID_AUTO_TEST_SUITE_END()

Propchange: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Sat Apr 11 14:29:04 2009
@@ -34,8 +34,10 @@
   federated_cluster_test clustered_replication_test
 
 check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
-cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework 
+cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
+	cluster_test.cpp PartialFailure.cpp
+
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework 
 
 unit_test_LDADD+=../cluster.la
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Sat Apr 11 14:29:04 2009
@@ -73,7 +73,7 @@
 
 
 ostream& operator<<(ostream& o, const cpg_name* n) {
-    return o << cluster::Cpg::str(*n);
+    return o << Cpg::str(*n);
 }
 
 ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -89,29 +89,12 @@
     return o;
 }
 
-template <class C> set<uint16_t> makeSet(const C& c) {
-    set<uint16_t> s;
+template <class C> set<int> makeSet(const C& c) {
+    set<int> s;
     copy(c.begin(), c.end(), inserter(s, s.begin()));
     return s;
 }
 
-template <class T>  set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
-    vector<Url> urls = source.getKnownBrokers();
-    if (n >= 0 && unsigned(n) != urls.size()) {
-        BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
-        // Retry up to 10 secs in .1 second intervals.
-        for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
-            sys::usleep(1000*100); // 0.1 secs
-            urls = source.getKnownBrokers();
-        }
-    }
-    BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
-    set<uint16_t> s;
-    for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) 
-        s.insert((*i)[0].get<TcpAddress>()->port);
-    return s;
-}
-
 class Sender {
   public:
     Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
@@ -175,7 +158,6 @@
 
 QPID_AUTO_TEST_CASE(testAcl) {
     ofstream policyFile("cluster_test.acl");
-    // FIXME aconway 2009-02-12: guest -> qpidd?
     policyFile << "acl allow foo@QPID create queue name=foo" << endl
                << "acl allow foo@QPID create queue name=foo2" << endl
                << "acl deny foo@QPID create queue name=bar" << endl
@@ -446,13 +428,13 @@
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
-    set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+    set<int> kb0 = knownBrokerPorts(c0.connection);
     BOOST_CHECK_EQUAL(kb0.size(), 1u);
     BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
 
     cluster.add();
     Client c1(cluster[1], "c1");
-    set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+    set<int> kb1 = knownBrokerPorts(c1.connection);
     kb0 = knownBrokerPorts(c0.connection, 2);
     BOOST_CHECK_EQUAL(kb1.size(), 2u);
     BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -460,7 +442,7 @@
 
     cluster.add();
     Client c2(cluster[2], "c2");
-    set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+    set<int> kb2 = knownBrokerPorts(c2.connection);
     kb1 = knownBrokerPorts(c1.connection, 3);
     kb0 = knownBrokerPorts(c0.connection, 3);
     BOOST_CHECK_EQUAL(kb2.size(), 3u);

Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
 # failures:
 srcdir=`dirname $0`
 PYTHON_DIR=$srcdir/../../../python
+export PYTHONPATH=$PYTHON_DIR
 
 trap stop_brokers INT EXIT
 

Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Sat Apr 11 14:29:04 2009
@@ -220,63 +220,13 @@
         cout << "\n\n\n\n";
     }
 
-
-    /* 
-       Only call this if you already know there is at least 
-       one child still running.  Supply a time in seconds.
-       If it has been at least that long since a shild stopped
-       running, we judge the system to have hung.
-    */
-    int
-    hanging ( int hangTime )
-    {
-        struct timeval now,
-                       duration;
-        gettimeofday ( &now, 0 );
-
-        int how_many_hanging = 0;
-
-        vector<child *>::iterator i;
-        for ( i = begin(); i != end(); ++ i )
-        {
-            //Not in POSIX
-            //timersub ( & now, &((*i)->startTime), & duration );
-            duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec;
-            duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec;
-            if (duration.tv_usec < 0) {
-                --duration.tv_sec;
-                duration.tv_usec += 1000000;
-            }
-
-            if ( (COMPLETED != (*i)->status)     // child isn't done running
-                  &&
-                 ( duration.tv_sec >= hangTime ) // it's been too long
-               )
-            {
-                std::cerr << "Child of type " 
-                          << (*i)->type 
-                          << " hanging.   "
-                          << "PID is "
-                          << (*i)->pid
-                          << endl;
-                ++ how_many_hanging;
-            }
-        }
-        
-        return how_many_hanging;
-    }
-    
-
     int verbosity;
 };
 
 
-
 children allMyChildren;
 
 
-
-
 void 
 childExit ( int ) 
 {
@@ -389,6 +339,7 @@
         ("--log-prefix")
         (prefix.str())
         ("--log-to-file")
+        ("--log-enable=error+")
         (prefix.str()+".log");
 
     if (endsWith(moduleOrDir, "cluster.so")) {
@@ -818,16 +769,6 @@
              return ERROR_ON_CHILD;
          }
 
-         // If one is hanging, quit.
-         if ( allMyChildren.hanging ( 120 ) )
-         {
-             /*
-              * Don't kill any processes.  Leave alive for questioning.
-              * */
-             std::cerr << "END_OF_TEST ERROR_HANGING\n";
-             return HANGING;
-         }
-
          if ( verbosity > 1 ) {
            std::cerr << "------- next kill-broker loop --------\n";
            allMyChildren.print();

Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Sat Apr 11 14:29:04 2009
@@ -51,5 +51,6 @@
 VERBOSITY=${VERBOSITY:-1}
 DURABILITY=${DURABILITY:-0}
 
+rm -f soak-*.log
 exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
 

Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Sat Apr 11 14:29:04 2009
@@ -28,15 +28,17 @@
     echo $* | newgrp ais
 }
 
-rm -f cluster*.log
-SIZE=${1:-1}; shift
+rm -f cluster*.log cluster.ports qpidd.port
+
+SIZE=${1:-3}; shift
 CLUSTER=`pwd`		# Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --auth=no $@"
 
 for (( i=0; i<SIZE; ++i )); do
-    PORT=`with_ais_group ../qpidd  -p0 --log-to-file=cluster$i.log $OPTS`  || exit 1
+    DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX`
+    PORT=`with_ais_group ../qpidd  -p0 --log-to-file=cluster$i.log $OPTS  --data-dir=$DDIR`  || exit 1
     echo $PORT >> cluster.ports
 done
 
-head cluster.ports > qpidd.port	# First member's port for tests.
+head -n 1 cluster.ports > qpidd.port	# First member's port for tests.
 

Added: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**@file
+ * Plug-in message store for tests.
+ * 
+ * Add functionality as required, build up a comprehensive set of
+ * features to support persistent behavior tests.
+ *
+ * Current features special "action" messages can:
+ *  - raise exception from enqueue.
+ *  - force host process to exit.
+ *  - do async completion after a delay.
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/cast.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid;
+using namespace broker;
+using namespace std;
+using namespace boost;
+using namespace qpid::sys;
+
+struct TestStoreOptions : public Options {
+
+    string name;
+
+    TestStoreOptions() : Options("Test Store Options") {
+        addOptions()
+            ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+    }
+};
+
+struct Completer : public Runnable {
+    intrusive_ptr<PersistableMessage> message;
+    int usecs;
+    Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
+    void run() {
+        qpid::sys::usleep(usecs);
+        message->enqueueComplete();
+        delete this;
+    }
+};
+    
+class TestStore : public NullMessageStore {
+  public:
+    TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+
+    ~TestStore() {
+        for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
+    }
+
+    void enqueue(TransactionContext* ,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const PersistableQueue& )
+    {
+        string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+
+        // Check the message for special instructions.
+        size_t i, j; 
+        if (starts_with(data, TEST_STORE_DO)
+            && (i = data.find(name+"[")) != string::npos
+            && (j = data.find("]", i)) != string::npos)
+        {
+            size_t start = i+name.size()+1;
+            string action = data.substr(start, j-start);
+
+            if (action == EXCEPTION) {
+                throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+            }
+            else if (action == EXIT_PROCESS) {
+                // FIXME aconway 2009-04-10: this is a dubious way to
+                // close the process at best, it can cause assertions or seg faults
+                // rather than clean exit.
+                QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
+                exit(0);
+            }
+            else if (starts_with(action, ASYNC)) {
+                std::string delayStr(action.substr(ASYNC.size()));
+                int delay = lexical_cast<int>(delayStr);
+                threads.push_back(Thread(*new Completer(msg, delay)));
+            }
+            else {
+                QPID_LOG(error, "TestStore " << name << " unknown action " << action);
+                msg->enqueueComplete();
+            }
+        }
+        else
+            msg->enqueueComplete();
+    }
+
+  private:
+    static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+    string name;
+    Broker& broker;
+    vector<Thread> threads;
+};
+
+const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
+const string TestStore::EXCEPTION = "exception";
+const string TestStore::EXIT_PROCESS = "exit_process";
+const string TestStore::ASYNC="async ";
+
+struct TestStorePlugin : public Plugin {
+
+    TestStoreOptions options;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize (Plugin::Target& target)
+    {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        broker->setStore (new TestStore(options.name, *broker));
+    }
+
+    void initialize(qpid::Plugin::Target&) {}
+};
+
+static TestStorePlugin pluginInstance;

Propchange: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Sat Apr 11 14:29:04 2009
@@ -48,6 +48,19 @@
       <field name="id" type="uint64"/>
     </control>
     
+    <domain name="error-type" type="uint8" label="Types of error">
+      <enum>
+	<choice name="none" value="0"/>
+	<choice name="session" value="1"/>
+	<choice name="connection" value="2"/>
+      </enum>
+    </domain>
+	
+    <control name="error-check" code="0x13">
+      <field name="type" type="error-type"/>
+      <field name="frame-seq" type="uint64"/>
+    </control>
+    
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
 
   </class>
@@ -132,6 +145,7 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
+      <field name="frame-seq" type="uint64"/>	 <!-- frame sequence number -->
     </control>
 
     <!-- Set the position of a replicated queue. -->
@@ -146,5 +160,6 @@
 
     <!-- Set expiry-id for subsequent messages. -->
     <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
+
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org