You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 13:29:14 UTC

svn commit: r821761 [2/6] - in /qpid/branches/java-broker-0-10/qpid: ./ cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/boost-1.32-support/ cpp/build-aux/ cpp/examples/qmf-...

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Oct  5 11:29:03 2009
@@ -429,13 +429,11 @@
     }
 } 
 
-
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
     HandlerHelper(s),
     releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
     releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
-    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
-    acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
  {}
 
 //
@@ -547,8 +545,7 @@
 
 void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
 {
-
-    commands.for_each(acceptOp);
+    state.accepted(commands);
 }
 
 framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Oct  5 11:29:03 2009
@@ -357,8 +357,7 @@
 
 void SessionState::senderCompleted(const SequenceSet& commands) {
     qpid::SessionState::senderCompleted(commands);
-    for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
-        semanticState.completed(i->first(), i->last());
+    semanticState.completed(commands);
 }
 
 void SessionState::readyToSend() {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct  5 11:29:03 2009
@@ -225,12 +225,25 @@
         if (reallyUnbind)
             unbind(queue, routingPattern, 0);
     } else if (fedOp == fedOpReorigin) {
-        for (BindingMap::iterator iter = bindings.begin();
-             iter != bindings.end(); iter++) {
-            const BoundKey& bk = iter->second;
-            if (bk.fedBinding.hasLocal()) {
-                propagateFedOp(iter->first, string(), fedOpBind, string());
+        /** gather up all the keys that need rebinding in a local vector
+         * while holding the lock.  Then propagate once the lock is
+         * released
+         */
+        std::vector<std::string> keys2prop;
+        {
+            RWlock::ScopedRlock l(lock);    
+            for (BindingMap::iterator iter = bindings.begin();
+                 iter != bindings.end(); iter++) {
+                const BoundKey& bk = iter->second;
+                
+                if (bk.fedBinding.hasLocal()) {
+                    keys2prop.push_back(iter->first);
+                }
             }
+        }   /* lock dropped */
+        for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+             key != keys2prop.end(); key++) {
+            propagateFedOp( *key, string(), fedOpBind, string());
         }
     }
 

Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct  5 11:29:03 2009
@@ -1,5 +1,7 @@
+no_keyword
 .deps
 .libs
 Makefile
 Makefile.in
 .dirstamp
+

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Oct  5 11:29:03 2009
@@ -147,11 +147,15 @@
         fail("Connection closed before it was established");
         break;
       case OPEN:
-        setState(CLOSING);
-        proxy.close(200, OK);
-        waitFor(FINISHED);
+        if (setState(CLOSING, OPEN)) {
+            proxy.close(200, OK);
+            waitFor(FINISHED);//FINISHED = CLOSED or FAILED
+        }
+        //else, state was changed from open after we checked, can only
+        //change to failed or closed, so nothing to do
         break;
-        // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
+
+        // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED
     }
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp Mon Oct  5 11:29:03 2009
@@ -60,6 +60,18 @@
     stateLock.notifyAll();
 }
 
+bool StateManager::setState(int s, int expected)
+{
+    Monitor::ScopedLock l(stateLock);
+    if (state == expected) {
+        state = s;
+        stateLock.notifyAll();
+        return true;
+    } else {
+        return false;
+    }
+}
+
 int StateManager::getState() const
 {
     Monitor::ScopedLock l(stateLock);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h Mon Oct  5 11:29:03 2009
@@ -36,6 +36,7 @@
 public:
     StateManager(int initial);
     void setState(int state);
+    bool setState(int state, int expected);
     int getState() const ;
     void waitForStateChange(int current);
     void waitFor(std::set<int> states);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Oct  5 11:29:03 2009
@@ -190,6 +190,7 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
+    quorum(boost::bind(&Cluster::leave, this)),
     initialized(false),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
@@ -214,7 +215,6 @@
     // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
     broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
 
-    if (settings.quorum) quorum.init();
     cpg.join(name);
     // pump the CPG dispatch manually till we get initialized. 
     while (!initialized)
@@ -226,10 +226,10 @@
 }
 
 void Cluster::initialize() {
+    if (settings.quorum) quorum.start(poller);
     if (myUrl.empty())
         myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
-    QPID_LOG(notice, *this << " member " << self <<  " joining "
-             << name << " with url=" << myUrl);
+    QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
@@ -315,7 +315,7 @@
 // Deliver CPG message.
 void Cluster::deliver(
     cpg_handle_t /*handle*/,
-    cpg_name* /*group*/,
+    const cpg_name* /*group*/,
     uint32_t nodeid,
     uint32_t pid,
     void* msg,
@@ -323,9 +323,11 @@
 {
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
-    Event e(Event::decodeCopy(from, buf));
-    LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
-    deliverEvent(e);
+    while (buf.available()) {
+        Event e(Event::decodeCopy(from, buf));
+        LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
+        deliverEvent(e);
+    }
 }
 
 LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
@@ -404,6 +406,7 @@
     LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
     LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
     Mutex::ScopedLock l(lock);
+    if (state == LEFT) return;
     EventFrame e(efConst);
     const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
     if (offer && error.isUnresolved()) {
@@ -507,10 +510,10 @@
 
 void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
-    cpg_name */*group*/,
-    cpg_address *current, int nCurrent,
-    cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int /*nJoined*/)
+    const cpg_name */*group*/,
+    const cpg_address *current, int nCurrent,
+    const cpg_address *left, int nLeft,
+    const cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
     if (state == INIT) {        // First config change.
@@ -518,10 +521,13 @@
         broker.setRecovery(nCurrent == 1);
         initialized = true;
     }
-    QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
-             << AddrList(left, nLeft, "left: "));
+    QPID_LOG(notice, *this << " membership change: "
+             << AddrList(current, nCurrent) << "("
+             << AddrList(joined, nJoined, "joined: ")
+             << AddrList(left, nLeft, "left: ")
+             << ")");
     std::string addresses;
-    for (cpg_address* p = current; p < current+nCurrent; ++p) 
+    for (const cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
     deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
 }
@@ -833,9 +839,9 @@
         "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
-    o << "cluster:" << STATE[cluster.state];
+    o << "cluster(" << cluster.self << " " << STATE[cluster.state];
     if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
-    return o;
+    return o << ")";;
 }
 
 MemberId Cluster::getId() const {
@@ -846,14 +852,6 @@
     return broker; // Immutable,  no need to lock.
 }
 
-void Cluster::checkQuorum() {
-    if (!quorum.isQuorate()) {
-        QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
-        leave();
-        throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
-    }
-}
-
 void Cluster::setClusterId(const Uuid& uuid, Lock&) {
     clusterId = uuid;
     if (mgmtObject) {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h Mon Oct  5 11:29:03 2009
@@ -101,8 +101,6 @@
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
 
-    void checkQuorum();
-
     const ClusterSettings& getSettings() const { return settings; }
 
     void deliverFrame(const EventFrame&);
@@ -169,7 +167,7 @@
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
-        struct cpg_name *group,
+        const struct cpg_name *group,
         uint32_t /*nodeid*/,
         uint32_t /*pid*/,
         void* /*msg*/,
@@ -179,10 +177,10 @@
     
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
-        struct cpg_name */*group*/,
-        struct cpg_address */*members*/, int /*nMembers*/,
-        struct cpg_address */*left*/, int /*nLeft*/,
-        struct cpg_address */*joined*/, int /*nJoined*/
+        const struct cpg_name */*group*/,
+        const struct cpg_address */*members*/, int /*nMembers*/,
+        const struct cpg_address */*left*/, int /*nLeft*/,
+        const struct cpg_address */*joined*/, int /*nJoined*/
     );
 
     // == Called in management threads.

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Oct  5 11:29:03 2009
@@ -72,7 +72,7 @@
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
-            // FIXME aconway 2009-05-20: temporary 
+            // TODO aconway 2009-05-20: temporary, remove
             ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
             ;
     }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h Mon Oct  5 11:29:03 2009
@@ -37,7 +37,7 @@
     bool checkErrors;
 
     ClusterSettings() : quorum(false), readMax(10),
-                        checkErrors(true) // FIXME aconway 2009-05-20: temporary
+                        checkErrors(true) // TODO aconway 2009-05-20: remove this option.
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Oct  5 11:29:03 2009
@@ -49,6 +49,28 @@
 // Global callback functions.
 void Cpg::globalDeliver (
     cpg_handle_t handle,
+    const struct cpg_name *group,
+    uint32_t nodeid,
+    uint32_t pid,
+    void* msg,
+    size_t msg_len)
+{
+    cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
+}
+
+void Cpg::globalConfigChange(
+    cpg_handle_t handle,
+    const struct cpg_name *group,
+    const struct cpg_address *members, size_t nMembers,
+    const struct cpg_address *left, size_t nLeft,
+    const struct cpg_address *joined, size_t nJoined
+)
+{
+    cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+}
+
+void Cpg::globalDeliver (
+    cpg_handle_t handle,
     struct cpg_name *group,
     uint32_t nodeid,
     uint32_t pid,
@@ -83,7 +105,7 @@
 
     QPID_LOG(info, "Initializing CPG");
     cpg_error_t err = cpg_initialize(&handle, &callbacks);
-    int retries = 6;
+    int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
     while (err == CPG_ERR_TRY_AGAIN && --retries) {
         QPID_LOG(notice, "Re-trying CPG initialization.");
         sys::sleep(5);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h Mon Oct  5 11:29:03 2009
@@ -20,7 +20,6 @@
  */
 
 #include "qpid/Exception.h"
-#include "qpid/cluster/Dispatchable.h"
 #include "qpid/cluster/types.h"
 #include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Mutex.h"
@@ -68,7 +67,7 @@
         virtual ~Handler() {};
         virtual void deliver(
             cpg_handle_t /*handle*/,
-            struct cpg_name *group,
+            const struct cpg_name *group,
             uint32_t /*nodeid*/,
             uint32_t /*pid*/,
             void* /*msg*/,
@@ -76,10 +75,10 @@
 
         virtual void configChange(
             cpg_handle_t /*handle*/,
-            struct cpg_name */*group*/,
-            struct cpg_address */*members*/, int /*nMembers*/,
-            struct cpg_address */*left*/, int /*nLeft*/,
-            struct cpg_address */*joined*/, int /*nJoined*/
+            const struct cpg_name */*group*/,
+            const struct cpg_address */*members*/, int /*nMembers*/,
+            const struct cpg_address */*left*/, int /*nLeft*/,
+            const struct cpg_address */*joined*/, int /*nJoined*/
         ) = 0;
     };
 
@@ -122,6 +121,24 @@
 
     static Cpg* cpgFromHandle(cpg_handle_t);
 
+    // New versions for corosync 1.0 and higher
+    static void globalDeliver(
+        cpg_handle_t handle,
+        const struct cpg_name *group,
+        uint32_t nodeid,
+        uint32_t pid,
+        void* msg,
+        size_t msg_len);
+
+    static void globalConfigChange(
+        cpg_handle_t handle,
+        const struct cpg_name *group,
+        const struct cpg_address *members, size_t nMembers,
+        const struct cpg_address *left, size_t nLeft,
+        const struct cpg_address *joined, size_t nJoined
+    );
+
+    // Old versions for openais
     static void globalDeliver(
         cpg_handle_t handle,
         struct cpg_name *group,

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp Mon Oct  5 11:29:03 2009
@@ -72,7 +72,7 @@
     if (buf.available() < e.size)
         throw Exception("Not enough data for multicast event");
     e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
-    memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
+    buf.getRawData((uint8_t*)(e.getData()), e.size);
     return e;
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon Oct  5 11:29:03 2009
@@ -24,10 +24,14 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQFrame.h"
+#include <boost/bind.hpp>
+#include <algorithm>
 
 namespace qpid {
 namespace cluster {
 
+static const int MCAST_IOV_MAX=63; // Limit imposed by CPG
+
 Multicaster::Multicaster(Cpg& cpg_, 
                          const boost::shared_ptr<sys::Poller>& poller,
                          boost::function<void()> onError_) :
@@ -36,7 +40,8 @@
 #endif
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
-    holding(true)
+    holding(true),
+    ioVector(MCAST_IOV_MAX)
 {
     queue.start();
 }
@@ -70,26 +75,29 @@
     queue.push(e);
 }
 
-
-Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(
+    const PollableEventQueue::Batch& events)
+{
+    PollableEventQueue::Batch::const_iterator i = events.begin();
     try {
-        PollableEventQueue::Batch::const_iterator i = values.begin();
-        while( i != values.end()) {
-            iovec iov = i->toIovec();
-            if (!cpg.mcast(&iov, 1)) {
-                // cpg didn't send because of CPG flow control.
-                break; 
+        while (i < events.end()) {
+            size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i));
+            std::transform(i, i+count, ioVector.begin(),
+                           boost::bind(&Event::toIovec, _1));
+            if (!cpg.mcast(&ioVector.front(), count)) {
+                QPID_LOG(trace, "CPG flow control, will resend "
+                         << events.end() - i << " events");
+                break;
             }
-            ++i;
+            i += count;
         }
-        return i;
     }
     catch (const std::exception& e) {
         QPID_LOG(critical, "Multicast error: " << e.what());
         queue.stop();
         onError();
-        return values.end();
     }
+    return i;
 }
 
 void Multicaster::release() {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h Mon Oct  5 11:29:03 2009
@@ -29,6 +29,7 @@
 #include "qpid/sys/LatencyTracker.h"
 #include <boost/shared_ptr.hpp>
 #include <deque>
+#include <vector>
 
 namespace qpid {
 
@@ -72,7 +73,7 @@
     PollableEventQueue queue;
     bool holding;
     PlainEventQueue holdingQueue;
-    std::vector<struct ::iovec> ioVector;
+    std::vector<  ::iovec> ioVector;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Mon Oct  5 11:29:03 2009
@@ -46,7 +46,6 @@
 
 void OutputInterceptor::send(framing::AMQFrame& f) {
     LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
-    parent.getCluster().checkQuorum();
     {
         sys::Mutex::ScopedLock l(lock);
         next->send(f);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Mon Oct  5 11:29:03 2009
@@ -33,15 +33,18 @@
                      boost::bind(&PollerDispatch::dispatch, this, _1), // read
                      0,         // write
                      boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect
-      )
+      ),
+      started(false)
 {}
     
 PollerDispatch::~PollerDispatch() {
-    dispatchHandle.stopWatch();
+    if (started)
+        dispatchHandle.stopWatch();
 }
 
 void PollerDispatch::start() {
     dispatchHandle.startWatch(poller);
+    started = true;
 }
 
 // Entry point: called by IO to dispatch CPG events.

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h Mon Oct  5 11:29:03 2009
@@ -51,6 +51,7 @@
     boost::shared_ptr<sys::Poller> poller;
     boost::function<void()> onError;
     sys::DispatchHandleRef dispatchHandle;
+    bool started;
 
 
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Mon Oct  5 11:29:03 2009
@@ -18,28 +18,86 @@
  * under the License.
  *
  */
+
 #include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Options.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
 
 namespace qpid {
 namespace cluster {
 
-Quorum::Quorum() : enable(false), cman(0) {}
+namespace {
+
+boost::function<void()> errorFn;
 
-Quorum::~Quorum() { if (cman) cman_finish(cman); }
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
+    if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+        QPID_LOG(critical, "Lost contact with cluster quorum.");
+        if (errorFn) errorFn();
+        cman_stop_notification(handle);
+    }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) {
+    errorFn = err;
+}
+
+Quorum::~Quorum() {
+    dispatchHandle.reset();
+    if (cman) cman_finish(cman);
+}
 
-void Quorum::init() {
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+    poller = p;
     enable = true;
+    QPID_LOG(debug, "Connecting to quorum service.");
     cman = cman_init(0);
     if (cman == 0) throw ErrnoException("Can't connect to cman service");
     if (!cman_is_quorate(cman)) {
         QPID_LOG(notice, "Waiting for cluster quorum.");
         while(!cman_is_quorate(cman)) sys::sleep(5);
     }
+    int err = cman_start_notification(cman, cmanCallbackFn);
+    if (err != 0) throw ErrnoException("Can't register for cman notifications");
+    watch(getFd());
 }
 
-bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
+void Quorum::watch(int fd) {
+    cmanFd = fd;
+    dispatchHandle.reset(
+        new sys::DispatchHandleRef(
+            sys::PosixIOHandle(cmanFd),
+            boost::bind(&Quorum::dispatch, this, _1), // read
+            0, // write
+            boost::bind(&Quorum::disconnect, this, _1)  // disconnect
+        ));
+    dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+    int fd = cman_get_fd(cman);
+    if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+    return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+    try {
+        cman_dispatch(cman, CMAN_DISPATCH_ALL);
+        int fd = getFd();
+        if (fd != cmanFd) watch(fd);
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+        errorFn();
+    }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+    QPID_LOG(critical, "Disconnected from quorum service");
+    errorFn();
+}
 
 }} // namespace qpid::cluster

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h Mon Oct  5 11:29:03 2009
@@ -22,26 +22,40 @@
  *
  */
 
+#include <qpid/sys/DispatchHandle.h>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+
 extern "C" {
 #include <libcman.h>
 }
 
 namespace qpid {
-
-class Options;
+namespace sys {
+class Poller;
+}
 
 namespace cluster {
+class Cluster;
 
 class Quorum {
   public:
-    Quorum();
+    Quorum(boost::function<void ()> onError);
     ~Quorum();
-    void init();
-    bool isQuorate();
+    void start(boost::shared_ptr<sys::Poller>);
     
   private:
+    void dispatch(sys::DispatchHandle&);
+    void disconnect(sys::DispatchHandle&);
+    int getFd();
+    void watch(int fd);
+    
     bool enable;
     cman_handle_t cman;
+    int cmanFd;
+    std::auto_ptr<sys::DispatchHandleRef> dispatchHandle;
+    boost::shared_ptr<sys::Poller> poller;
 };
 
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h Mon Oct  5 11:29:03 2009
@@ -21,15 +21,20 @@
  * under the License.
  *
  */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+
 namespace qpid {
 namespace cluster {
+class Cluster;
 
 /** Null implementation of quorum. */
 
 class Quorum {
   public:
-    void init() {}
-    bool isQuorate() { return true; }
+    Quorum(boost::function<void ()>) {}
+    void start(boost::shared_ptr<sys::Poller>) {}
 };
 
 #endif  /*!QPID_CLUSTER_QUORUM_NULL_H*/

Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct  5 11:29:03 2009
@@ -1,3 +1,157 @@
+DtxSetTimeoutBody.cpp
+AllInvoker.cpp
+XaResult.cpp
+ClusterConnectionTxDequeueBody.cpp
+ClusterUpdateRequestBody.cpp
+SessionKnownCompletedBody.cpp
+MessageAcquireBody.cpp
+ClientInvoker.cpp
+ExchangeDeclareBody.cpp
+ClusterConnectionOutputTaskBody.cpp
+ClusterConnectionSessionStateBody.cpp
+ClusterUpdateOfferBody.cpp
+ConnectionStartOkBody.cpp
+ClusterShutdownBody.cpp
+ConnectionTuneOkBody.cpp
+AMQP_ClientProxy.h
+ServerInvoker.cpp
+MessageSubscribeBody.cpp
+TxSelectBody.cpp
+QueueQueryResult.cpp
+FileQosOkBody.cpp
+ExecutionSyncBody.cpp
+ClusterConnectionDeliverCloseBody.cpp
+Xid.cpp
+ClusterMessageExpiredBody.cpp
+AMQP_ServerProxy.h
+ClusterConnectionMembershipBody.cpp
+StreamQosBody.cpp
+ConnectionOpenBody.cpp
+DtxGetTimeoutResult.cpp
+ExchangeQueryResult.cpp
+FileOpenBody.cpp
+MessageResumeBody.cpp
+DtxRecoverBody.cpp
+ClusterConnectionAnnounceBody.cpp
+MessageCancelBody.cpp
+ExchangeBoundResult.cpp
+ClusterConnectionTxEnqueueBody.cpp
+DtxGetTimeoutBody.cpp
+MethodBodyFactory.cpp
+SessionGapBody.cpp
+ConnectionHeartbeatBody.cpp
+SessionCompletedBody.cpp
+ConnectionRedirectBody.cpp
+ConnectionCloseBody.cpp
+MessageTransferBody.cpp
+ConnectionTuneBody.cpp
+DtxSelectBody.cpp
+ExecutionResultBody.cpp
+FileStageBody.cpp
+QueueDeleteBody.cpp
+StreamQosOkBody.cpp
+ClusterConnectionRetractOfferBody.cpp
+SessionConfirmedBody.cpp
+MessageFlowBody.cpp
+ConnectionCloseOkBody.cpp
+AMQP_ServerOperations.h
+ExecutionExceptionBody.cpp
+MessageReleaseBody.cpp
+ClusterRetractOfferBody.cpp
+FragmentProperties.cpp
+SessionRequestTimeoutBody.cpp
+ClusterReadyBody.cpp
+DtxRollbackBody.cpp
+DtxEndBody.cpp
+SessionTimeoutBody.cpp
+FileQosBody.cpp
+SessionExpectedBody.cpp
+ConnectionSecureBody.cpp
+ClusterConfigChangeBody.cpp
+ExchangeBindBody.cpp
+ClusterConnectionAddQueueListenerBody.cpp
+ClusterConnectionTxStartBody.cpp
+ClusterConnectionTxEndBody.cpp
+ExchangeDeleteBody.cpp
+FileRejectBody.cpp
+ClusterConnectionExpiryIdBody.cpp
+AMQP_ClientProxy.cpp
+ClusterConnectionDeliverDoOutputBody.cpp
+StreamPublishBody.cpp
+DeliveryProperties.cpp
+FileConsumeOkBody.cpp
+SessionDetachedBody.cpp
+SessionAttachBody.cpp
+AMQP_ServerProxy.cpp
+StreamProperties.cpp
+AllInvoker.h
+MessageSetFlowModeBody.cpp
+TypeCode.cpp
+FileAckBody.cpp
+ConnectionSecureOkBody.cpp
+AMQP_AllProxy.cpp
+MethodBodyDefaultVisitor.h
+DtxStartBody.cpp
+reply_exceptions.cpp
+FileProperties.cpp
+ClusterConnectionAccumulatedAckBody.cpp
+ClusterConnectionAbortBody.cpp
+ClusterErrorCheckBody.cpp
+TxCommitBody.cpp
+TxRollbackBody.cpp
+MethodBodyDefaultVisitor.cpp
+StreamDeliverBody.cpp
+ClusterConnectionQueueBody.cpp
+QueueQueryBody.cpp
+ConnectionOpenOkBody.cpp
+StreamConsumeBody.cpp
+FileOpenOkBody.cpp
+ClusterConnectionShadowReadyBody.cpp
+SessionDetachBody.cpp
+DtxForgetBody.cpp
+frame_body_lists.h
+ClusterConnectionConsumerStateBody.cpp
+StreamConsumeOkBody.cpp
+AMQP_AllOperations.h
+AMQP_ClientOperations.h
+MessageStopBody.cpp
+FilePublishBody.cpp
+ExchangeBoundBody.cpp
+ReplyTo.cpp
+MessageRejectBody.cpp
+ExchangeUnbindBody.cpp
+SessionFlushBody.cpp
+MessageFlushBody.cpp
+QueuePurgeBody.cpp
+StreamReturnBody.cpp
+StreamCancelBody.cpp
+ClientInvoker.h
+DtxCommitBody.cpp
+Header.cpp
+MethodBodyConstVisitor.h
+DtxPrepareBody.cpp
+FileReturnBody.cpp
+SessionAttachedBody.cpp
+FileCancelBody.cpp
+ServerInvoker.h
+SessionCommandPointBody.cpp
+ClusterConnectionDeliveryRecordBody.cpp
+MessageAcquireResult.cpp
+AMQP_AllProxy.h
+MessageProperties.cpp
+all_method_bodies.h
+MessageResumeResult.cpp
+FileDeliverBody.cpp
+ClusterConnectionQueuePositionBody.cpp
+ClusterConnectionTxAcceptBody.cpp
+ClusterConnectionExchangeBody.cpp
+QueueDeclareBody.cpp
+MessageAcceptBody.cpp
+ClusterConnectionTxPublishBody.cpp
+DtxRecoverResult.cpp
+ExchangeQueryBody.cpp
+FileConsumeBody.cpp
+ConnectionStartBody.cpp
 .deps
 .libs
 .dirstamp
@@ -6,3 +160,4 @@
 method_variants.h
 MethodHolderMaxSize.h
 MaxMethodBodySize.h
+

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Oct  5 11:29:03 2009
@@ -41,7 +41,7 @@
 
 AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
 
-AMQFrame::~AMQFrame() { init(); }
+AMQFrame::~AMQFrame() {}
 
 AMQBody* AMQFrame::getBody() {
     // Non-const AMQBody* may be used to modify the body.

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon Oct  5 11:29:03 2009
@@ -284,10 +284,7 @@
         readableCallback(*this);
         writableCallback(*this);
         break;
-    case Poller::DISCONNECTED: {
-        ScopedLock<Mutex> lock(stateLock);
-        poller->unmonitorHandle(*this, Poller::INOUT);
-        }
+    case Poller::DISCONNECTED:
         if (disconnectedCallback) {
             disconnectedCallback(*this);
         }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp Mon Oct  5 11:29:03 2009
@@ -55,9 +55,10 @@
     fire();
 }
 
+// This can only be used to setup the next fire time. After the Timer has already fired
 void TimerTask::setupNextFire() {
     if (period && readyToFire()) {
-        nextFireTime = AbsTime(nextFireTime, period);
+        nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period));
         cancelled = false;
     } else {
         QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
@@ -66,13 +67,11 @@
 
 // Only allow tasks to be delayed
 void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
-void TimerTask::delayTill(AbsTime time) { period = 0; nextFireTime = max(nextFireTime, time); }
 
 void TimerTask::cancel() {
     ScopedLock<Mutex> l(callbackLock);
     cancelled = true;
 }
-bool TimerTask::isCancelled() const { return cancelled; }
 
 Timer::Timer() :
     active(false) 
@@ -98,13 +97,13 @@
 
             // warn on extreme lateness
             AbsTime start(AbsTime::now());
-            Duration late(t->sortTime, start);
-            if (late > 500 * TIME_MSEC) {
-                QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms");
-            }
+            Duration delay(t->sortTime, start);
             {
             ScopedLock<Mutex> l(t->callbackLock);
             if (t->cancelled) {
+                if (delay > 500 * TIME_MSEC) {
+                    QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC << "ms late");
+                }
                 continue;
             } else if(Duration(t->nextFireTime, start) >= 0) {
                 Monitor::ScopedUnlock u(monitor);
@@ -112,7 +111,17 @@
                 // Warn on callback overrun
                 AbsTime end(AbsTime::now());
                 Duration overrun(tasks.top()->nextFireTime, end);
-                if (overrun > 1 * TIME_MSEC) {
+                bool late = delay > 1 * TIME_MSEC;
+                bool overran = overrun > 1 * TIME_MSEC;
+                if (late)
+                if (overran) {
+                    QPID_LOG(warning,
+                        "Timer woken up " << delay / TIME_MSEC << "ms late, "
+                        "overrunning by " << overrun / TIME_MSEC << "ms [taking "
+                        << Duration(start, end) << "]");
+                } else {
+                    QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
+                } else if (overran) {
                     QPID_LOG(warning,
                         "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
                         << Duration(start, end) << "]");

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h Mon Oct  5 11:29:03 2009
@@ -58,9 +58,7 @@
 
     QPID_COMMON_EXTERN void setupNextFire();
     QPID_COMMON_EXTERN void restart();
-    QPID_COMMON_EXTERN void delayTill(AbsTime fireTime);
     QPID_COMMON_EXTERN void cancel();
-    QPID_COMMON_EXTERN bool isCancelled() const;
 
 protected:
     // Must be overridden with callback

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Oct  5 11:29:03 2009
@@ -575,6 +575,11 @@
                 // (just not writable), allow us to readable until we get here again
                 if (epe.events & ::EPOLLHUP) {
                     if (eh.isHungup()) {
+                        eh.setInactive();
+                        // Don't set up last Handle so that we don't reset this handle
+                        // on re-entering Poller::wait. This means that we will never
+                        // be set active again once we've returned disconnected, and so
+                        // can never be returned again.
                         return Event(handle, DISCONNECTED);
                     }
                     eh.setHungup();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp Mon Oct  5 11:29:03 2009
@@ -43,7 +43,8 @@
                           create ? OPEN_ALWAYS : OPEN_EXISTING,
                           FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */
                           NULL);
-    QPID_WINDOWS_CHECK_NOT(h, INVALID_HANDLE_VALUE);
+    if (h == INVALID_HANDLE_VALUE)
+        throw qpid::Exception(path + qpid::sys::strError(GetLastError()));
     impl.reset(new LockFilePrivate(h));
 }
 

Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct  5 11:29:03 2009
@@ -1,3 +1,4 @@
+allSegmentTypes.h
 
 Makefile.in
 Makefile
@@ -41,3 +42,4 @@
 qpid_ping
 datagen
 
+

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am Mon Oct  5 11:29:03 2009
@@ -256,6 +256,18 @@
 datagen_SOURCES=datagen.cpp
 datagen_LDADD=$(lib_common)
 
+check_PROGRAMS+=qrsh_server
+qrsh_server_SOURCES=qrsh_server.cpp
+qrsh_server_LDADD=$(lib_client)
+
+check_PROGRAMS+=qrsh_run
+qrsh_run_SOURCES=qrsh_run.cpp
+qrsh_run_LDADD=$(lib_client)
+
+check_PROGRAMS+=qrsh
+qrsh_SOURCES=qrsh.cpp
+qrsh_LDADD=$(lib_client)
+
 
 TESTS_ENVIRONMENT = \
     VALGRIND=$(VALGRIND) \
@@ -305,9 +317,8 @@
 # Not run under valgrind, too slow
 
 LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
- run_failover_soak  \
+ run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
-# TODO: renable the temporarily disabled the failing reliable_replication_test when QPID-1984 is resolved.
 
 EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test \
   federated_cluster_test_with_node_failure \

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp Mon Oct  5 11:29:03 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,7 +50,7 @@
         int lastWrite = ::write(fd, s.c_str(), s.size());
         if ( lastWrite >= 0) {
             bytesWritten += lastWrite;
-        } 
+        }
     } while (errno != EAGAIN);
     return bytesWritten;
 }
@@ -58,32 +58,32 @@
 int readALot(int fd) {
     int bytesRead = 0;
     char buf[1024];
-    
+
     do {
         errno = 0;
         int lastRead = ::read(fd, buf, sizeof(buf));
         if ( lastRead >= 0) {
             bytesRead += lastRead;
-        } 
+        }
     } while (errno != EAGAIN);
     return bytesRead;
 }
 
 int main(int /*argc*/, char** /*argv*/)
 {
-    try 
+    try
     {
         int sv[2];
         int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
         assert(rc >= 0);
-        
+
         // Set non-blocking
         rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
         assert(rc >= 0);
 
         rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
         assert(rc >= 0);
-        
+
         // Make up a large string
         string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
         for (int i = 0; i < 6; i++)
@@ -97,32 +97,32 @@
         // Write as much as we can to socket 0
         int bytesWritten = writeALot(sv[0], testString);
         cout << "Wrote(0): " << bytesWritten << " bytes\n";
-        
+
         // Read as much as we can from socket 1
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
         cout << "Read(1): " << bytesRead << " bytes\n";
 
         auto_ptr<Poller> poller(new Poller);
-        
+
         PosixIOHandle f0(sv[0]);
         PosixIOHandle f1(sv[1]);
 
         PollerHandle h0(f0);
         PollerHandle h1(f1);
-        
+
         poller->registerHandle(h0);
         poller->monitorHandle(h0, Poller::INOUT);
-        
+
         // h0 should be writable
         Poller::Event event = poller->wait();
         assert(event.handle == &h0);
         assert(event.type == Poller::WRITABLE);
-        
+
         // Write as much as we can to socket 0
         bytesWritten = writeALot(sv[0], testString);
         cout << "Wrote(0): " << bytesWritten << " bytes\n";
-        
+
         // Wait for 500ms - h0 no longer writable
         event = poller->wait(500000000);
         assert(event.handle == 0);
@@ -133,7 +133,7 @@
         event = poller->wait();
         assert(event.handle == &h1);
         assert(event.type == Poller::READ_WRITABLE);
-        
+
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
         cout << "Read(1): " << bytesRead << " bytes\n";
@@ -147,11 +147,11 @@
         // Test multiple interrupts
         assert(poller->interrupt(h0) == true);
         assert(poller->interrupt(h1) == true);
-        
+
         // Make sure we can interrupt them again
         assert(poller->interrupt(h0) == true);
         assert(poller->interrupt(h1) == true);
-        
+
         // Make sure that they both come out
         event = poller->wait();
         assert(event.type == Poller::INTERRUPTED);
@@ -170,25 +170,44 @@
 
         event = poller->wait();
         assert(event.handle == &h0);
-        assert(event.type == Poller::WRITABLE);    
+        assert(event.type == Poller::WRITABLE);
 
         // We didn't write anything so it should still be writable
         event = poller->wait();
         assert(event.handle == &h0);
-        assert(event.type == Poller::WRITABLE);    
+        assert(event.type == Poller::WRITABLE);
 
         poller->unmonitorHandle(h0, Poller::INOUT);
 
         event = poller->wait(500000000);
         assert(event.handle == 0);
-        
+
         poller->unregisterHandle(h1);
+        assert(poller->interrupt(h1) == false);
+
+        // close the other end to force a disconnect
+        ::close(sv[1]);
+
+        // Now make sure that we are readable followed by disconnected
+        // and after that we never return again
+        poller->monitorHandle(h0, Poller::INOUT);
+        event = poller->wait(500000000);
+        assert(event.handle == &h0);
+        assert(event.type == Poller::READABLE);
+        event = poller->wait(500000000);
+        assert(event.handle == &h0);
+        assert(event.type == Poller::DISCONNECTED);
+        event = poller->wait(1500000000);
+        assert(event.handle == 0);
+
+        // Now we're disconnected monitoring should have no effect at all
+        poller->unmonitorHandle(h0, Poller::INOUT);
+        event = poller->wait(1500000000);
+        assert(event.handle == 0);
+
         poller->unregisterHandle(h0);
-        
-        // Make sure we can't interrupt them now
         assert(poller->interrupt(h0) == false);
-        assert(poller->interrupt(h1) == false);
-        
+
         // Test shutdown
         poller->shutdown();
         event = poller->wait();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp Mon Oct  5 11:29:03 2009
@@ -19,7 +19,7 @@
  * under the License.
  *
  */
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/Monitor.h"
 #include "unit_test.h"
 #include <math.h>
@@ -28,7 +28,6 @@
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
 
-using namespace qpid::broker;
 using namespace qpid::sys;
 using boost::intrusive_ptr;
 using boost::dynamic_pointer_cast;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py Mon Oct  5 11:29:03 2009
@@ -23,36 +23,10 @@
 from qpid.util import connect
 from qpid.connection import Connection
 from qpid.datatypes import uuid4
-from qpid.testlib import TestBase010, testrunner
+from qpid.testlib import TestBase010
 from qmf.console import Session
 from qpid.datatypes import Message
 
-def scan_args(name, default=None, args=sys.argv[1:]):
-    if (name in args):
-        pos = args.index(name)
-        return args[pos + 1]
-    elif default:
-        return default
-    else:
-        print "Please specify extra argument: %s" % name
-        sys.exit(2)
-
-def extract_args(name, args):
-    if (name in args):
-        pos = args.index(name)
-        del args[pos:pos+2]
-    else:
-        return None
-
-def get_broker_port():
-    return scan_args("--port", "5672")
-
-def get_session(user, passwd):
-    socket = connect('127.0.0.1', int(get_broker_port()))
-    connection = Connection (sock=socket, username=user, password=passwd)
-    connection.start()
-    return connection.session(str(uuid4()))
-
 class ACLFile:
     def __init__(self):
         self.f = open('data_dir/policy.acl','w');
@@ -65,6 +39,12 @@
         
 class ACLTests(TestBase010):
 
+    def get_session(self, user, passwd):
+        socket = connect(self.broker.host, self.broker.port)
+        connection = Connection (sock=socket, username=user, password=passwd)
+        connection.start()
+        return connection.session(str(uuid4()))
+
     def reload_acl(self):
         acl = self.qmf.getObjects(_class="acl")[0]    
         return acl.reloadACLFile()
@@ -93,7 +73,7 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         try:
             session.queue_declare(queue="deny_queue")
         except qpid.session.SessionException, e:
@@ -118,7 +98,7 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         try:
             session.queue_declare(queue="allow_queue")
         except qpid.session.SessionException, e:
@@ -146,7 +126,7 @@
 
         self.reload_acl()
 
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         try:
             session.queue_declare(queue="allow_queue")
         except qpid.session.SessionException, e:
@@ -243,21 +223,21 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         
         try:
             session.queue_declare(queue="q1", durable='true', passive='true')
             self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
         
         try:
             session.queue_declare(queue="q2", exclusive='true')
             self.fail("ACL should deny queue create request with name=q2 exclusive=true");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code) 
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
         
         try:
             session.queue_declare(queue="q3", exclusive='true')
@@ -271,14 +251,14 @@
             self.fail("ACL should deny queue query request for q3");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
         
         try:
             session.queue_purge(queue="q3")
             self.fail("ACL should deny queue purge request for q3");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
             
         try:
             session.queue_purge(queue="q4")
@@ -291,7 +271,7 @@
             self.fail("ACL should deny queue delete request for q4");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
             
         try:
             session.queue_delete(queue="q3")
@@ -319,21 +299,21 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         
         try:
             session.exchange_declare(exchange='testEx', durable='true', passive='true')
             self.fail("ACL should deny exchange create request with name=testEx durable=true passive=true");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
        
         try:
             session.exchange_declare(exchange='ex1', type='direct')
             self.fail("ACL should deny exchange create request with name=ex1 type=direct");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code) 
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
         
         try:
             session.exchange_declare(exchange='myXml', type='direct')
@@ -347,14 +327,14 @@
             self.fail("ACL should deny queue query request for q3");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
                 
         try:
             session.exchange_bind(exchange='myEx', queue='q1', binding_key='rk1')
             self.fail("ACL should deny exchange bind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code) 
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
 
         try:
             session.exchange_bind(exchange='myXml', queue='q1', binding_key='x')
@@ -366,7 +346,7 @@
             self.fail("ACL should deny exchange unbind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code) 
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
 
         try:
             session.exchange_unbind(exchange='myXml', queue='q1', binding_key='x')
@@ -379,7 +359,7 @@
             self.fail("ACL should deny exchange delete request for myEx");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
             
         try:
             session.exchange_delete(exchange='myXml')
@@ -404,7 +384,7 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         
         
         try:
@@ -420,14 +400,14 @@
             self.fail("ACL should deny message subscriber request for queue='q1'");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
             
         try:
             session.message_subscribe(queue='q2', destination='myq1')
             self.fail("ACL should deny message subscriber request for queue='q2'");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
               
         try:
             session.message_subscribe(queue='q3', destination='myq1')
@@ -453,7 +433,7 @@
         
         self.reload_acl()       
         
-        session = get_session('bob','bob')
+        session = self.get_session('bob','bob')
         
         try:
             session.exchange_declare(exchange='myEx', type='topic')
@@ -468,14 +448,14 @@
             self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk1");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')                        
+            session = self.get_session('bob','bob')                        
             
         try:
             session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
             self.fail("ACL should deny message transfer to name=amq.topic");
         except qpid.session.SessionException, e:
             self.assertEqual(530,e.args[0].error_code)
-            session = get_session('bob','bob')
+            session = self.get_session('bob','bob')
                         
         try:
             session.message_transfer(destination="myEx", message=Message(props,"Test"))
@@ -489,13 +469,4 @@
             session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
         except qpid.session.SessionException, e:
             if (530 == e.args[0].error_code):
-                self.fail("ACL should allow message transfer to exchange amq.direct"); 
-                        
-                        
-if __name__ == '__main__':
-    args = sys.argv[1:]
-    #need to remove the extra options from args as test runner doesn't recognize them
-    extract_args("--port", args)
-    args.append("acl") 
-    
-    if not testrunner.run(args): sys.exit(1)                
+                self.fail("ACL should allow message transfer to exchange amq.direct");

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py Mon Oct  5 11:29:03 2009
@@ -20,44 +20,21 @@
 
 import sys
 import os
-from qpid.testlib import TestBase010, testrunner
+from qpid.testlib import TestBase010
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from time import sleep
 
-def add_module(args=sys.argv[1:]):
-    for a in args:
-        if a.startswith("cli"):
-            return False
-    return True
-
-def scan_args(name, default=None, args=sys.argv[1:]):
-    if (name in args):
-        pos = args.index(name)
-        return args[pos + 1]
-    elif default:
-        return default
-    else:
-        print "Please specify extra argument: %s" % name
-        sys.exit(2)
-
-def extract_args(name, args):
-    if (name in args):
-        pos = args.index(name)
-        del args[pos:pos+2]
-    else:
-        return None
-
-def remote_host():
-    return scan_args("--remote-host", "localhost")
+class CliTests(TestBase010):
 
-def remote_port():
-    return int(scan_args("--remote-port"))
+    def remote_host(self):
+        return self.defines.get("remote-host", "localhost")
 
-def cli_dir():
-    return scan_args("--cli-dir")
+    def remote_port(self):
+        return int(self.defines["remote-port"])
 
-class CliTests(TestBase010):
+    def cli_dir(self):
+        return self.defines["cli-dir"]
 
     def makeQueue(self, qname, arguments):
         ret = os.system(self.command(" add queue " + qname + " " + arguments))
@@ -150,15 +127,15 @@
         self.startQmf();
         qmf = self.qmf
 
-        command = cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\
-            (testrunner.port, remote_host(), remote_port())
+        command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\
+            (self.broker.port, self.remote_host(), self.remote_port())
         ret = os.system(command)
         self.assertEqual(ret, 0)
 
         links = qmf.getObjects(_class="link")
         found = False
         for link in links:
-            if link.port == remote_port():
+            if link.port == self.remote_port():
                 found = True
         self.assertEqual(found, True)
 
@@ -174,18 +151,4 @@
         return None
 
     def command(self, arg = ""):
-        return cli_dir() + "/qpid-config -a localhost:%d" % testrunner.port + " " + arg
-
-
-if __name__ == '__main__':
-    args = sys.argv[1:]
-    #need to remove the extra options from args as test runner doesn't recognise them
-    extract_args("--remote-port", args)
-    extract_args("--remote-host", args)
-    extract_args("--cli-dir", args)
-
-    if add_module():
-        #add module(s) to run to testrunners args
-        args.append("cli_tests") 
-    
-    if not testrunner.run(args): sys.exit(1)
+        return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk Mon Oct  5 11:29:03 2009
@@ -29,44 +29,47 @@
 
 
 # ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS += \
-    ais_check \
-	run_cluster_tests \
-	federated_cluster_test \
+TESTS +=					\
+	ais_check				\
+	test_watchdog				\
+	run_cluster_tests			\
+	federated_cluster_test			\
 	clustered_replication_test
-	
-EXTRA_DIST += \
-	ais_check \
-	start_cluster \
-	stop_cluster \
-	restart_cluster \
-	cluster_python_tests \
-	cluster_python_tests_failing.txt \
-  	federated_cluster_test \
-  	clustered_replication_test \
-  	run_cluster_tests \
-  	run_long_cluster_tests \
-  	testlib.py \
-  	cluster_tests.py \
-  	long_cluster_tests.py
-  	
-
-LONG_TESTS += \
-	run_long_cluster_tests \
-	start_cluster \
-	cluster_python_tests \
+
+EXTRA_DIST +=					\
+	ais_check				\
+	test_watchdog				\
+	start_cluster				\
+	stop_cluster				\
+	restart_cluster				\
+	cluster_python_tests			\
+	cluster_python_tests_failing.txt	\
+	federated_cluster_test			\
+	clustered_replication_test		\
+	run_cluster_tests			\
+	run_long_cluster_tests			\
+	testlib.py				\
+	cluster_tests.py			\
+	long_cluster_tests.py
+
+LONG_TESTS +=					\
+	run_long_cluster_tests			\
+	start_cluster				\
+	cluster_python_tests			\
 	stop_cluster
 
 qpidtest_PROGRAMS += cluster_test
-cluster_test_SOURCES = \
-  	cluster_test.cpp \
-  	unit_test.cpp \
-  	ClusterFixture.cpp \
-  	ClusterFixture.h \
-  	ForkedBroker.h \
-  	ForkedBroker.cpp \
-  	PartialFailure.cpp \
-  	ClusterFailover.cpp
+
+cluster_test_SOURCES =				\
+	cluster_test.cpp			\
+	unit_test.cpp				\
+	ClusterFixture.cpp			\
+	ClusterFixture.h			\
+	ForkedBroker.h				\
+	ForkedBroker.cpp			\
+	PartialFailure.cpp			\
+	ClusterFailover.cpp
+
 cluster_test_LDADD=$(lib_client) $(lib_broker) -lboost_unit_test_framework
 
 qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py Mon Oct  5 11:29:03 2009
@@ -21,10 +21,10 @@
 import os, signal, sys, unittest
 from testlib import TestBaseCluster
 
-class ClusterTests(TestBaseCluster):
+class ShortTests(TestBaseCluster):
     """Basic cluster with async store tests"""
     
-    def test_Cluster_01_Initialization(self):
+    def test_01_Initialization(self):
         """Start a single cluster containing several nodes, and stop it again"""
         try:
             clusterName = "cluster-01"
@@ -34,7 +34,7 @@
             self.killAllClusters(True)
             raise
 
-    def test_Cluster_02_MultipleClusterInitialization(self):
+    def test_02_MultipleClusterInitialization(self):
         """Start several clusters each with several nodes and stop them again"""
         try:
             for i in range(0, 5):
@@ -48,7 +48,7 @@
             self.killAllClusters(True)
             raise
         
-    def test_Cluster_03_AddRemoveNodes(self):
+    def test_03_AddRemoveNodes(self):
         """Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)"""
         try:
             clusterName = "cluster-03"
@@ -68,7 +68,7 @@
             self.killAllClusters(True)
             raise
 
-    def test_Cluster_04_RemoveRestoreNodes(self):
+    def test_04_RemoveRestoreNodes(self):
         """Create a multi-node cluster, then kill some of the nodes and restart them"""
         try:
             clusterName = "cluster-04"
@@ -95,7 +95,7 @@
             self.killAllClusters(True)
             raise
         
-    def test_Cluster_05_KillAllNodesThenRecover(self):
+    def test_05_KillAllNodesThenRecover(self):
         """Create a multi-node cluster, then kill *all* nodes, then restart the cluster"""
         try:
             clusterName = "cluster-05"
@@ -107,7 +107,7 @@
             self.killAllClusters(True)
             raise
     
-    def test_Cluster_06_PublishConsume(self):
+    def test_06_PublishConsume(self):
         """Publish then consume 100 messages from a single cluster"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"])
@@ -117,7 +117,7 @@
             self.killAllClusters(True)
             raise
     
-    def test_Cluster_07_MultiplePublishConsume(self):
+    def test_07_MultiplePublishConsume(self):
         """Staggered publish and consume on a single cluster"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"])
@@ -135,7 +135,7 @@
             self.killAllClusters(True)
             raise
     
-    def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
+    def test_08_MsgPublishConsumeAddRemoveNodes(self):
         """Staggered publish and consume interleaved with adding and removing nodes on a single cluster"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"])
@@ -159,7 +159,7 @@
             self.killAllClusters(True)
             raise
      
-    def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
+    def test_09_MsgPublishConsumeRemoveRestoreNodes(self):
         """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"])
@@ -184,7 +184,7 @@
             self.killAllClusters(True)
             raise
    
-    def test_Cluster_10_LinearNodeKillCreateProgression(self):
+    def test_10_LinearNodeKillCreateProgression(self):
         """Publish and consume messages while linearly killing all original nodes and replacing them with new ones"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"])
@@ -204,7 +204,7 @@
             self.killAllClusters(True)
             raise
     
-    def test_Cluster_11_CircularNodeKillRestoreProgression(self):
+    def test_11_CircularNodeKillRestoreProgression(self):
         """Publish and consume messages while circularly killing all original nodes and restoring them again"""
         try:
             dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"])
@@ -226,7 +226,7 @@
             self.killAllClusters(True)
             raise
         
-    def test_Cluster_12_KillAllNodesRecoverMessages(self):
+    def test_12_KillAllNodesRecoverMessages(self):
         """Create a cluster, add and delete messages, kill all nodes then recover cluster and messages"""
         if not self._storeEnable:
             print " No store loaded, skipped"
@@ -253,7 +253,7 @@
             self.killAllClusters(True)
             raise         
     
-    def test_Cluster_13_TopicExchange(self):
+    def test_13_TopicExchange(self):
         """Create topic exchange in a cluster and make sure it behaves correctly"""
         try:
             topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
@@ -290,7 +290,80 @@
             self.killAllClusters(True)
             raise  
      
-    def test_Cluster_14_FanoutExchange(self):
+    def test_14_FanoutExchange(self):
+        """Create fanout exchange in a cluster and make sure it behaves correctly"""
+        try:
+            fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]
+            fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList)
+            # Place initial 20 messages, retrieve 10
+            fh.sendMsgs(20)
+            fh.receiveMsgs(10)
+            # Kill and add some nodes
+            fh.killNode(0)
+            fh.killNode(2)
+            fh.addNodes(2)
+            # Place another 20 messages, retrieve 20
+            fh.sendMsgs(20)
+            fh.receiveMsgs(20)
+            # Kill and add another node
+            fh.killNode(4)
+            fh.addNodes()
+            # Add another 2 queues
+            fh.addQueues(["test-queue-14-D", "test-queue-14-E"])
+            # Place another 20 messages, retrieve 20
+            fh.sendMsgs(20)
+            fh.receiveMsgs(20)     
+            # Kill all nodes but one
+            fh.killNode(1)
+            fh.killNode(3)
+            fh.killNode(6)
+            # Check messages
+            fh.finalizeTest()
+        except:
+            self.killAllClusters(True)
+            raise
+
+class LongTests(TestBaseCluster):
+    """Basic cluster with async store tests"""
+    
+    def test_01_TopicExchange(self):
+        """Create topic exchange in a cluster and make sure it behaves correctly"""
+        try:
+            topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
+            th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList)
+             # Place initial messages
+            th.sendMsgs("C.hello.A", 10)
+            th.sendMsgs("hello.world", 10) # matches none of the queues
+            th.sendMsgs("D.hello.A", 10)
+            th.sendMsgs("hello.B", 20)
+            th.sendMsgs("D.hello", 20)
+            # Kill and add some nodes
+            th.killNode(0)
+            th.killNode(2)
+            th.addNodes(2)
+            # Pull 10 messages from each queue
+            th.receiveMsgs(10)
+            # Kill and add another node
+            th.killNode(4)
+            th.addNodes()
+            # Add two more queues
+            th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"})
+            # Place more messages
+            th.sendMsgs("C.bye.A", 10)
+            th.sendMsgs("hello.bye", 20) # matches none of the queues
+            th.sendMsgs("hello.bye.B", 20)
+            th.sendMsgs("D.bye", 20)
+            # Kill all nodes but one
+            th.killNode(1)
+            th.killNode(3)
+            th.killNode(6)
+            # Pull all remaining messages from each queue and check messages
+            th.finalizeTest()
+        except:
+            self.killAllClusters(True)
+            raise  
+     
+    def test_02_FanoutExchange(self):
         """Create fanout exchange in a cluster and make sure it behaves correctly"""
         try:
             fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c Mon Oct  5 11:29:03 2009
@@ -26,5 +26,5 @@
  */
 
 #include <stdio.h>
-void* dlclose(void* handle) {}
+void* dlclose(void* handle) { return 0; }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org