You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/06 23:47:25 UTC

svn commit: r1165887 - in /qpid/branches/qpid-2920-1/qpid/cpp/src: qpid/broker/ qpid/cluster/ qpid/cluster/exp/ tests/

Author: aconway
Date: Tue Sep  6 21:47:25 2011
New Revision: 1165887

URL: http://svn.apache.org/viewvc?rev=1165887&view=rev
Log:
QPID-2920: Initial stab at time-based queue sharing.

Modified:
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/types.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/ais_check
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep  6 21:47:25 2011
@@ -1293,12 +1293,13 @@ void Queue::UsageBarrier::destroy()
 
 // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
 void Queue::stop() {
+    QPID_LOG(critical, "FIXME Queue stopped " << getName());
     // FIXME aconway 2011-05-25: rename dispatching - acquiring?
     dispatching.stop();
 }
 
 void Queue::start() {
-    QPID_LOG(critical, "FIXME start context=" << clusterContext);
+    QPID_LOG(critical, "FIXME Queue started " << getName());
     assert(clusterContext);      // FIXME aconway 2011-06-08: XXX
     dispatching.start();
     notifyListener();

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Tue Sep  6 21:47:25 2011
@@ -109,7 +109,6 @@ void BrokerContext::routed(const boost::
 
 void BrokerContext::acquire(const broker::QueuedMessage& qm) {
     if (tssNoReplicate) return;
-    QueueContext::get(*qm.queue)->acquire();
     core.mcast(ClusterMessageAcquireBody(
                    ProtocolVersion(), qm.queue->getName(), qm.position));
 }
@@ -177,15 +176,12 @@ void BrokerContext::unbind(broker::Queue
 // n is the number of consumers including the one just added.
 // FIXME aconway 2011-06-27: rename, conflicting terms.
 void BrokerContext::consume(broker::Queue& q, size_t n) {
-    if (n == 1) {
-        // FIXME aconway 2011-06-27: should be on QueueContext for symmetry?
-        core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName()));
-    }
+    QueueContext::get(q)->consume(n);
 }
 
 // n is the number of consumers after the cancel.
 void BrokerContext::cancel(broker::Queue& q, size_t n) {
-    if (n == 0) QueueContext::get(q)->unsubscribed();
+    QueueContext::get(q)->cancel(n);
 }
 
 void BrokerContext::empty(broker::Queue& ) {
@@ -196,10 +192,7 @@ void BrokerContext::stopped(broker::Queu
     boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
     // Don't forward the stopped call if the queue does not yet have a cluster context
     // this when the queue is first created locally.
-    if (qc){
-        QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName());
-        qc->stopped();
-    }
+    if (qc) qc->stopped();
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Tue Sep  6 21:47:25 2011
@@ -21,79 +21,101 @@
 
 #include "QueueContext.h"
 #include "Multicaster.h"
+#include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/ClusterQueueResubscribeBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
 #include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
-
+#include "qpid/sys/Timer.h"
 
 namespace qpid {
 namespace cluster {
 
+
+class OwnershipTimeout : public sys::TimerTask {
+    QueueContext& queueContext;
+
+  public:
+    OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
+        TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
+
+    // FIXME aconway 2011-07-27: thread safety on deletion?
+    void fire() { queueContext.timeout(); }
+};
+
 QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
-    : owner(NOT_OWNER), count(0), queue(q), mcast(m)
+    : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
 {
-    QPID_LOG(debug, "Assign cluster context to queue " << q.getName());
-    q.stop();                   // Initially stopped. Must all before setClusterContext
     q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
-
+    q.stop();  // Initially stopped.
 }
 
-// Called by QueueReplica in deliver thread.
-void QueueContext::sharedOwner(size_t limit) {
-    QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get());
+QueueContext::~QueueContext() {
+    // FIXME aconway 2011-07-27: revisit shutdown logic.
+    // timeout() could be called concurrently with destructor.
     sys::Mutex::ScopedLock l(lock);
-    count = limit;
-    if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
-    owner = SHARED_OWNER;
+    timerTask->cancel();
 }
 
-// Called by QueueReplica in deliver thread.
-void QueueContext::soleOwner() {
-    QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get());
+void QueueContext::replicaState(QueueOwnership state) {
     sys::Mutex::ScopedLock l(lock);
-    count = 0;
-    if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
-    owner = SOLE_OWNER;
-}
-
-// Called by BrokerContext in connection thread(s) on acquiring a message
-void QueueContext::acquire() {
-    bool stop = false;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own.
-        QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName()
-                 << " owner="  << owner << " count=" << count);
-        if (owner == SHARED_OWNER) {
-            // Note count could be 0 if there are concurrent calls to acquire.
-            if (count && --count == 0) {
-                stop = true;
-            }
+    switch (state) {
+      case UNSUBSCRIBED:
+      case SUBSCRIBED:
+        break;
+      case SOLE_OWNER:
+        queue.start();
+        if (timerTask) {        // no need for timeout.
+            timerTask->cancel();
+            timerTask = 0;
         }
+        break;
+      case SHARED_OWNER:
+        queue.start();
+        if (timerTask) timerTask->cancel();
+        // FIXME aconway 2011-07-28: configurable interval.
+        timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
+        timer.add(timerTask);
+        break;
     }
-    // FIXME aconway 2011-06-28: could have multiple stop() threads...
-    if (stop) queue.stop();
 }
 
-// Callback set up by queue.stop()
+// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+
+void QueueContext::consume(size_t n) {
+    sys::Mutex::ScopedLock l(lock);
+    consumers = n;
+    if (n == 1) mcast.mcast(
+        framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
+}
+
+void QueueContext::cancel(size_t n) {
+    sys::Mutex::ScopedLock l(lock);
+    consumers = n;
+    if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock?
+}
+
+void QueueContext::timeout() {
+    QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
+    queue.stop();
+}
+
+
+// Callback set up by queue.stop(), called when no threads are dispatching from the queue.
+// Release the queue.
 void QueueContext::stopped() {
     sys::Mutex::ScopedLock l(lock);
-    if (owner == NOT_OWNER) {
+    // FIXME aconway 2011-07-28: review thread safety of state.
+    // Deffered call to stopped doesn't sit well.
+    // queueActive is invaled while stop is in progress?
+    if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
-    } else {
-        owner = NOT_OWNER;
+    else
         mcast.mcast(framing::ClusterQueueResubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
-    }
-}
-
-void QueueContext::unsubscribed() {
-    QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName());
-    queue.stop();
-    sys::Mutex::ScopedLock l(lock);
-    owner = NOT_OWNER;
 }
 
 boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
@@ -102,4 +124,5 @@ boost::intrusive_ptr<QueueContext> Queue
 }
 
 
+
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Tue Sep  6 21:47:25 2011
@@ -24,10 +24,11 @@
 
 
 #include <qpid/RefCounted.h>
+#include "qpid/sys/Time.h"
 #include <qpid/sys/Mutex.h>
+#include "qpid/cluster/types.h"
 #include <boost/intrusive_ptr.hpp>
 
-
 // FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on
 // class broker::Cluster::Queue. This becomes the cluster context.
 
@@ -35,55 +36,58 @@ namespace qpid {
 namespace broker {
 class Queue;
 }
+namespace sys {
+class Timer;
+class TimerTask;
+}
+
 namespace cluster {
 
 class Multicaster;
 
  /**
  * Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status
+ * Manages the local queue start/stop status.
  *
- * Thread safe: Called by connection and dispatch threads.
+ * Thread safe: Called by connection, dispatch and timer threads.
  */
 class QueueContext : public RefCounted {
-    // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr?
   public:
     QueueContext(broker::Queue& q, Multicaster& m);
+    ~QueueContext();
 
-    /** Sharing ownership of queue, can acquire up to limit before releasing.
-     * Called in deliver thread.
-     */
-    void sharedOwner(size_t limit);
-
-    /** Sole owner of queue, no limits to acquiring */
-    void soleOwner();
+    /** Replica state has changed, called in deliver thread. */
+    void replicaState(QueueOwnership);
 
-    /**
-     * Count an acquired message against the limit.
-     * Called from connection threads while consuming messages
+    /** Called when queue is stopped, no threads are dispatching.
+     * Connection or deliver thread.
      */
-    void acquire();
-
-    /** Called if the queue becomes empty, from connection thread. */
-    void empty();
-
-    /** Called when queue is stopped, connection or deliver thread. */
     void stopped();
 
-    /** Called when the last subscription to a queue is cancelled */
-    void unsubscribed();
+    /** Called when a consumer is added to the queue.
+     *@param n: nubmer of consumers after new one is added.
+     */
+    void consume(size_t n);
+
+    /** Called when a consumer is cancelled on the queue.
+     *@param n: nubmer of consumers after the cancel.
+     */
+    void cancel(size_t n);
 
     /** Get the context for a broker queue. */
     static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
 
+    /** Called when the timer runs out: stop the queue. */
+    void timeout();
+
   private:
-    void release();
+    sys::Timer& timer;
 
     sys::Mutex lock;
-    enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner;
-    size_t count;               // Count of dequeues remaining, 0 means no limit.
     broker::Queue& queue;       // FIXME aconway 2011-06-08: should be shared/weak ptr?
     Multicaster& mcast;
+    boost::intrusive_ptr<sys::TimerTask> timerTask;
+    size_t consumers;
 
     // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
 };

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Tue Sep  6 21:47:25 2011
@@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o
     return o;
 }
 
-std::ostream& operator<<(std::ostream& o, QueueReplica::State s) {
+std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
     static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
     return o << tags[s];
 }
@@ -58,13 +58,13 @@ std::ostream& operator<<(std::ostream& o
 // FIXME aconway 2011-05-17: error handling for asserts.
 
 void QueueReplica::subscribe(const MemberId& member) {
-    State before = getState();
+    QueueOwnership before = getState();
     subscribers.push_back(member);
     update(before);
 }
 
 void QueueReplica::unsubscribe(const MemberId& member) {
-    State before = getState();
+    QueueOwnership before = getState();
     MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
     if (i != subscribers.end()) {
         subscribers.erase(i, subscribers.end());
@@ -74,30 +74,20 @@ void QueueReplica::unsubscribe(const Mem
 
 void QueueReplica::resubscribe(const MemberId& member) {
     assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
-    State before = getState();
+    QueueOwnership before = getState();
     subscribers.pop_front();
     subscribers.push_back(member);
     update(before);
 }
 
-void QueueReplica::update(State before) {
-    const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable
-    State after = getState();
-    if (before == after) return;
+void QueueReplica::update(QueueOwnership before) {
     QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
-    switch (after) {
-      case UNSUBSCRIBED: break;
-      case SUBSCRIBED: break;
-      case SOLE_OWNER:
-        context->soleOwner();
-        break;
-      case SHARED_OWNER:
-        context->sharedOwner(acquireLimit);
-        break;
-    }
+    QueueOwnership after = getState();
+    if (before == after) return;
+    context->replicaState(after);
 }
 
-QueueReplica::State QueueReplica::getState() const {
+QueueOwnership QueueReplica::getState() const {
     if (isOwner())
         return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
     return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h Tue Sep  6 21:47:25 2011
@@ -35,7 +35,6 @@ class Queue;
 }
 
 namespace cluster {
-class QueueHandler;
 class QueueContext;
 
 /**
@@ -56,15 +55,9 @@ class QueueReplica : public RefCounted
     void resubscribe(const MemberId&);
 
   private:
-    enum State {
-        UNSUBSCRIBED,
-        SUBSCRIBED,
-        SOLE_OWNER,
-        SHARED_OWNER
-    };
 
   friend class PrintSubscribers;
-  friend std::ostream& operator<<(std::ostream&, State);
+  friend std::ostream& operator<<(std::ostream&, QueueOwnership);
   friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
 
     typedef std::deque<MemberId> MemberQueue;
@@ -74,10 +67,10 @@ class QueueReplica : public RefCounted
     MemberId self;
     boost::intrusive_ptr<QueueContext> context;
 
-    State getState() const;
+    QueueOwnership getState() const;
     bool isOwner() const;
     bool isSubscriber(const MemberId&) const;
-    void update(State before);
+    void update(QueueOwnership before);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Sep  6 21:47:25 2011
@@ -72,7 +72,6 @@ void WiringHandler::createQueue(const st
     assert(q);                  // FIXME aconway 2011-05-10: error handling.
     // TODO aconway 2011-05-10: if we implement multi-group for queues then
     // this call is a problem: comes from wiring delivery thread, not queues.
-    // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers..
     queueHandler->add(q);
     QPID_LOG(debug, "cluster: create queue " << q->getName());
 }

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/types.h?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/types.h Tue Sep  6 21:47:25 2011
@@ -82,6 +82,16 @@ std::ostream& operator<<(std::ostream&, 
 /** Number to identify a message being routed. */
 typedef uint32_t RoutingId;
 
+// FIXME aconway 2011-07-28: can we put these 2 back  in the
+// QueueReplica & QueueContext?
+/** State of a queue with respect to a cluster member. */
+enum QueueOwnership {
+    UNSUBSCRIBED,
+    SUBSCRIBED,
+    SOLE_OWNER,
+    SHARED_OWNER
+};
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_TYPES_H*/

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/BrokerClusterCalls.cpp Tue Sep  6 21:47:25 2011
@@ -249,8 +249,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
-    // Note: empty is called once for each receiver.
-    BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+    // FIXME aconway 2011-07-25: empty called once per receiver?
     BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
     BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
     BOOST_CHECK_EQUAL(h.size(), i);

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/ais_check?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/ais_check (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/ais_check Tue Sep  6 21:47:25 2011
@@ -18,8 +18,6 @@
 # under the License.
 #
 
-srcdir=`dirname $0`
-
 # Check AIS requirements and run tests if found.
 ps -u root | grep 'aisexec\|corosync' >/dev/null || {
     echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is not running.

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py?rev=1165887&r1=1165886&r2=1165887&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/cluster2_tests.py Tue Sep  6 21:47:25 2011
@@ -137,6 +137,12 @@ class Cluster2Tests(BrokerTest):
 
         # FIXME aconway 2010-10-29: test unbind, may need to use old API.
 
+    def duration(self):
+        d = self.config.defines.get("DURATION")
+        if d: return float(d)*60
+        else: return 3                  # Default is to be quick
+
+
     def test_dequeue_mutex(self):
         """Ensure that one and only one consumer receives each dequeued message."""
         class Receiver(Thread):
@@ -163,13 +169,12 @@ class Cluster2Tests(BrokerTest):
         for r in receivers: r.start()
 
         n = 0
-        t = time.time() + 1             # Send for 1 second.
+        t = time.time() + self.duration()
         while time.time() < t:
             sender.send(str(n))
             n += 1
         for r in receivers: r.join();
-        print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
-        for r in receivers: assert len(r.messages) # At least one message to each
+        for r in receivers: len(r.messages) > n/6 # Fairness test.
         messages = [int(m.content) for r in receivers for m in r.messages ]
         messages.sort()
         self.assertEqual(range(n), messages)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org