You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/16 22:16:54 UTC

svn commit: r1171756 - in /qpid/branches/qpid-2920-active/qpid/cpp: src/ src/qpid/broker/ src/qpid/cluster/exp/ src/qpid/sys/ src/tests/ xml/

Author: aconway
Date: Fri Sep 16 20:16:53 2011
New Revision: 1171756

URL: http://svn.apache.org/viewvc?rev=1171756&view=rev
Log:
QPID-2920: New cluster release/requeue.

Almost functional, seeing sporadic hangs in qpid-cpp-benchmark with two brokers:
qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000

Added:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h   (with props)
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk Fri Sep 16 20:16:53 2011
@@ -83,6 +83,7 @@ cluster_la_SOURCES =				\
   qpid/cluster/OutputInterceptor.h		\
   qpid/cluster/PollerDispatch.cpp		\
   qpid/cluster/PollerDispatch.h			\
+  qpid/cluster/PrettyId.h			\
   qpid/cluster/ProxyInputHandler.h		\
   qpid/cluster/Quorum.h				\
   qpid/cluster/InitialStatusMap.h		\

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Cluster.h Fri Sep 16 20:16:53 2011
@@ -57,8 +57,7 @@ class Cluster
 
     /** A message is delivered to a queue.
      * Called before actually pushing the message to the queue.
-     *@return If true the message should be pushed to the queue now.
-     * otherwise the cluster code will push the message when it is replicated.
+     *@return If true the message should be enqueued now, false for delayed enqueue.
      */
     virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
 
@@ -71,8 +70,10 @@ class Cluster
     /** A locally-acquired message is released by the consumer and re-queued. */
     virtual void release(const QueuedMessage&) = 0;
 
-    /** A message is removed from the queue. */
-    virtual void dequeue(const QueuedMessage&) = 0;
+    /** A message is removed from the queue.
+     *@return true if the message should be dequeued, false for delayed dequeue.
+     */
+    virtual bool dequeue(const QueuedMessage&) = 0;
 
     // Consumers
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/NullCluster.h Fri Sep 16 20:16:53 2011
@@ -42,7 +42,7 @@ class NullCluster : public Cluster
     virtual void routed(const boost::intrusive_ptr<Message>&) {}
     virtual void acquire(const QueuedMessage&) {}
     virtual void release(const QueuedMessage&) {}
-    virtual void dequeue(const QueuedMessage&) {}
+    virtual bool dequeue(const QueuedMessage&) { return false; }
 
     // Consumers
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Fri Sep 16 20:16:53 2011
@@ -113,7 +113,7 @@ Queue::Queue(const string& _name, bool _
     deleted(false),
     barrier(*this),
     autoDeleteTimeout(0),
-    dispatching(boost::bind(&Queue::acquireStopped,this))
+    consuming(boost::bind(&Queue::consumingStopped,this))
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -154,7 +154,7 @@ void Queue::deliver(boost::intrusive_ptr
     // Check for deferred delivery in a cluster.
     if (broker && broker->deferDelivery(name, msg))
         return;
-    // Same thing but for the new cluster interface.
+    // Check for deferred delivery with new cluster interface.
     if (broker && !broker->getCluster().enqueue(*this, msg))
         return;
 
@@ -227,39 +227,32 @@ void Queue::requeue(const QueuedMessage&
             }
         }
     }
-
-    if (broker) broker->getCluster().release(msg);
+    if (broker) broker->getCluster().release(msg); // FIXME aconway 2011-09-12: review. rename requeue?
     copy.notify();
 }
 
 /** Mark a scope that acquires a message.
  *
- * ClusterAcquireScope is declared before are taken.  The calling
- * function sets qmsg with the lock held, but the call to
- * Cluster::acquire() will happen after the lock is released.
+ * ClusterAcquireScope is declared before locks are taken.  The
+ * calling function sets qmsg with the lock held, but the call to
+ * Cluster::acquire() will happen after the lock is released in
+ * ~ClusterAcquireScope().
  *
  * Also marks a Stoppable as busy for the duration of the scope.
  **/
 struct ClusterAcquireScope {
-    Broker* broker;
-    Queue& queue;
     QueuedMessage qmsg;
 
-    ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {}
+    ClusterAcquireScope() {}
 
     ~ClusterAcquireScope() {
-        if (broker) {
-            // FIXME aconway 2011-06-27:  Move to QueueContext.
-            // Avoid the indirection via queuename.
-            if (qmsg.queue) broker->getCluster().acquire(qmsg);
-            else broker->getCluster().empty(queue);
-        }
+        if (qmsg.queue) qmsg.queue->getBroker()->getCluster().acquire(qmsg);
     }
 };
 
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
-    ClusterAcquireScope acquireScope(*this); // Outside lock
+    ClusterAcquireScope acquireScope; // Outside lock
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -312,13 +305,13 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
-        Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming
-        if (!stopper) {
+        Stoppable::Scope consumeScope(consuming);
+        if (!consumeScope) {
             QPID_LOG(trace, "Queue is stopped: " << name);
             listeners.addListener(c);
             return NO_MESSAGES;
         }
-        ClusterAcquireScope acquireScope(*this); // Outside the lock
+        ClusterAcquireScope acquireScope; // Outside the lock
         Mutex::ScopedLock locker(messageLock);
         if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -461,7 +454,7 @@ void Queue::cancel(Consumer::shared_ptr 
 }
 
 QueuedMessage Queue::get(){
-    ClusterAcquireScope acquireScope(*this); // Outside lock
+    ClusterAcquireScope acquireScope; // Outside lock
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
     if (messages->pop(msg)) acquireScope.qmsg = msg;
@@ -709,6 +702,10 @@ void Queue::enqueueAborted(boost::intrus
 // return true if store exists,
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
+    // FIXME aconway 2011-09-13: new cluster needs tx/dtx support.
+    if (!ctxt && broker)
+        if (!broker->getCluster().dequeue(msg)) return false;
+
     ScopedUse u(barrier);
     if (!u.acquired) return false;
     {
@@ -719,8 +716,6 @@ bool Queue::dequeue(TransactionContext* 
         }
     }
 
-    if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
-
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -737,7 +732,7 @@ bool Queue::dequeue(TransactionContext* 
 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
-    if (broker) broker->getCluster().dequeue(msg); // Outside lock
+    // FIXME aconway 2011-09-13: new cluster needs TX support.
     Mutex::ScopedLock locker(messageLock);
     dequeued(msg);
     if (mgmtObject != 0) {
@@ -919,7 +914,7 @@ void Queue::notifyDeleted()
     set.notifyAll();
 }
 
-void Queue::acquireStopped() {
+void Queue::consumingStopped() {
     if (broker) broker->getCluster().stopped(*this);
 }
 
@@ -1291,15 +1286,13 @@ void Queue::UsageBarrier::destroy()
     while (count) parent.messageLock.wait();
 }
 
-// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
-void Queue::stop() {
+void Queue::stopConsumers() {
     QPID_LOG(trace, "Queue stopped: " << getName());
-    // FIXME aconway 2011-05-25: rename dispatching - acquiring?
-    dispatching.stop();
+    consuming.stop();
 }
 
-void Queue::start() {
+void Queue::startConsumers() {
     QPID_LOG(trace, "Queue started: " << getName());
-    dispatching.start();
+    consuming.start();
     notifyListener();
 }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.h Fri Sep 16 20:16:53 2011
@@ -132,9 +132,8 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
-    // Allow dispatching consumer threads to be stopped. Used by cluster
-    sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring?
-    boost::intrusive_ptr<RefCounted> clusterContext;
+    sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster
+    boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -182,7 +181,7 @@ class Queue : public boost::enable_share
 
     void checkNotDeleted();
     void notifyDeleted();
-    void acquireStopped();
+    void consumingStopped();
 
   public:
 
@@ -396,10 +395,10 @@ class Queue : public boost::enable_share
     /** Stop consumers. Return when all consumer threads are stopped.
      *@pre Queue is active and not already stopping.
      */
-    void stop();
+    void stopConsumers();
 
     /** Start consumers. */
-    void start();
+    void startConsumers();
 
     /** Context information used in a cluster. */
     boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Fri Sep 16 20:16:53 2011
@@ -93,10 +93,9 @@ bool BrokerContext::enqueue(Queue& queue
         core.getRoutingMap().put(tssRoutingId, msg);
     }
     core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
-    // TODO aconway 2010-10-21: configable option for strict (wait
-    // for CPG deliver to do local deliver) vs.  loose (local deliver
-    // immediately).
-    return false;
+    // TODO aconway 2010-10-21: review delivery options: strict (wait
+    // for CPG delivery vs loose (local deliver immediately).
+    return false; // Strict delivery, cluster will call Queue deliver.
 }
 
 void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
@@ -113,25 +112,27 @@ void BrokerContext::acquire(const broker
                    ProtocolVersion(), qm.queue->getName(), qm.position));
 }
 
-// FIXME aconway 2011-05-24: need to handle acquire and release.
-// Dequeue in the wrong place?
-void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
-    if (tssNoReplicate) return;
-    core.mcast(ClusterMessageDequeueBody(
-                   ProtocolVersion(), qm.queue->getName(), qm.position));
+bool BrokerContext::dequeue(const broker::QueuedMessage& qm) {
+    if (!tssNoReplicate)
+        core.mcast(ClusterMessageDequeueBody(
+                       ProtocolVersion(), qm.queue->getName(), qm.position));
+    return false;               // FIXME aconway 2011-09-14: needed?
 }
 
-void BrokerContext::release(const broker::QueuedMessage& ) {
-    // FIXME aconway 2011-05-24: TODO
+// FIXME aconway 2011-09-14: rename requeue?
+void BrokerContext::release(const broker::QueuedMessage& qm) {
+    if (!tssNoReplicate)
+        core.mcast(ClusterMessageReleaseBody(
+                       ProtocolVersion(), qm.queue->getName(), qm.position, qm.payload->getRedelivered()));
 }
 
 // FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
 void BrokerContext::create(broker::Queue& q) {
-    if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit
-    // FIXME aconway 2011-06-08: error handling- if already set...
-    // Create local context immediately, queue will be stopped until replicated.
+    q.stopConsumers();          // FIXME aconway 2011-09-14: Stop queue initially.
+    if (tssNoReplicate) return;
+    assert(!QueueContext::get(q));
     boost::intrusive_ptr<QueueContext> context(
-        new QueueContext(q,core.getMulticaster()));
+        new QueueContext(q, core.getMulticaster()));
     std::string data(q.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     q.encode(buf);
@@ -174,11 +175,12 @@ void BrokerContext::unbind(broker::Queue
 }
 
 // n is the number of consumers including the one just added.
-// FIXME aconway 2011-06-27: rename, conflicting terms.
+// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe?
 void BrokerContext::consume(broker::Queue& q, size_t n) {
     QueueContext::get(q)->consume(n);
 }
 
+// FIXME aconway 2011-09-13: rename unsubscribe?
 // n is the number of consumers after the cancel.
 void BrokerContext::cancel(broker::Queue& q, size_t n) {
     QueueContext::get(q)->cancel(n);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Fri Sep 16 20:16:53 2011
@@ -57,7 +57,7 @@ class BrokerContext : public broker::Clu
     bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
     void routed(const boost::intrusive_ptr<broker::Message>&);
     void acquire(const broker::QueuedMessage&);
-    void dequeue(const broker::QueuedMessage&);
+    bool dequeue(const broker::QueuedMessage&);
     void release(const broker::QueuedMessage&);
 
     // Consumers

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Fri Sep 16 20:16:53 2011
@@ -68,7 +68,6 @@ void Core::fatal() {
 }
 
 void Core::mcast(const framing::AMQBody& body) {
-    QPID_LOG(trace, "cluster multicast: " << body);
     multicaster.mcast(body);
 }
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp Fri Sep 16 20:16:53 2011
@@ -22,6 +22,7 @@
 #include "Core.h"
 #include "EventHandler.h"
 #include "HandlerBase.h"
+#include "PrettyId.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/cluster/types.h"
 #include "qpid/framing/AMQFrame.h"
@@ -49,17 +50,6 @@ void EventHandler::start() {
     dispatcher.start();
 }
 
-// Print member ID or "self" if member is self
-struct PrettyId {
-    MemberId id, self;
-    PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
-};
-
-std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
-    if (id.id == id.self) return o << "self";
-    else return o << id.id;
-}
-
 // Deliver CPG message.
 void EventHandler::deliver(
     cpg_handle_t /*handle*/,

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/LockedMap.h Fri Sep 16 20:16:53 2011
@@ -39,8 +39,20 @@ class LockedMap
     Value get(const Key& key) const {
         sys::RWlock::ScopedRlock r(lock);
         typename Map::const_iterator i = map.find(key);
-        if (i == map.end()) return Value();
-        else return i->second;
+        return (i == map.end()) ? Value() : i->second;
+    }
+
+    /** Update value with the value for key.
+     *@return true if key was found.
+     */
+    bool get(const Key& key, Value& value) const {
+        sys::RWlock::ScopedRlock r(lock);
+        typename Map::const_iterator i = map.find(key);
+        if (i != map.end())  {
+            value = i->second;
+            return true;
+        }
+        return false;
     }
 
     /** Associate value with key, overwriting any previous value for key. */

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Fri Sep 16 20:16:53 2011
@@ -22,7 +22,9 @@
 #include "Core.h"
 #include "MessageHandler.h"
 #include "BrokerContext.h"
+#include "QueueContext.h"
 #include "EventHandler.h"
+#include "PrettyId.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/QueueRegistry.h"
@@ -72,7 +74,7 @@ void MessageHandler::enqueue(RoutingId r
     else
         msg = memberMap[sender()].routingMap[routingId];
     if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
-                                       << " failed:  unknown message"));
+                                       << " failed: unknown message"));
     BrokerContext::ScopedSuppressReplication ssr;
     queue->deliver(msg);
 }
@@ -84,40 +86,51 @@ void MessageHandler::routed(RoutingId ro
         memberMap[sender()].routingMap.erase(routingId);
 }
 
+// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
+// and scan queue once.
 void MessageHandler::acquire(const std::string& q, uint32_t position) {
     // Note acquires from other members. My own acquires were executed in
     // the connection thread
     if (sender() != self()) {
-        // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext
-        // by broker for possible re-queuing if a broker leaves.
-        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire failed");
         QueuedMessage qm;
         BrokerContext::ScopedSuppressReplication ssr;
         bool ok = queue->acquireMessageAt(position, qm);
-        (void)ok;                   // Avoid unused variable warnings.
-        assert(ok);             // FIXME aconway 2011-08-04: failing this assertion.
+        (void)ok;               // Avoid unused variable warnings.
+        assert(ok);             // FIXME aconway 2011-09-14: error handling
         assert(qm.position.getValue() == position);
         assert(qm.payload);
+        // Save for possible requeue.
+        QueueContext::get(*queue)->acquire(qm);
     }
-}
+    QPID_LOG(trace, "cluster message " << q << "[" << position
+             << "] acquired by " << PrettyId(sender(), self()));
+ }
 
-void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) {
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
     if (sender() == self()) {
         // FIXME aconway 2010-10-28: we should complete the ack that initiated
         // the dequeue at this point, see BrokerContext::dequeue
-        return;
     }
-    boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
-    BrokerContext::ScopedSuppressReplication ssr;
-    // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext.
-    // Do we need to call this? Review with gsim.
-    // QueuedMessage qm;
-    // Get qm from QueueContext?
-    // queue->dequeue(0, qm);
+    else {
+        // FIXME aconway 2011-09-15: new cluster, inefficient looks up
+        // message by position multiple times?
+        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+        // Remove fom the unacked list
+        QueueContext::get(*queue)->dequeue(position);
+        BrokerContext::ScopedSuppressReplication ssr;
+        QueuedMessage qm = queue->find(position);
+        if (qm.queue) queue->dequeue(0, qm);
+    }
 }
 
-void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) {
-    // FIXME aconway 2011-05-24:
+// FIXME aconway 2011-09-14: rename as requeue?
+void MessageHandler::release(const std::string& q, uint32_t position, bool redelivered) {
+    // FIXME aconway 2011-09-15: review release/requeue logic.
+    if (sender() != self()) {
+        boost::shared_ptr<Queue> queue = findQueue(q, "Cluster release failed");
+        QueueContext::get(*queue)->requeue(position, redelivered);
+    }
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Fri Sep 16 20:16:53 2011
@@ -60,7 +60,7 @@ class MessageHandler : public framing::A
     void routed(uint32_t routingId);
     void acquire(const std::string& queue, uint32_t position);
     void dequeue(const std::string& queue, uint32_t position);
-    void release(const std::string& queue, uint32_t position);
+    void release(const std::string& queue, uint32_t position, bool redelivered);
 
   private:
     struct Member {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp Fri Sep 16 20:16:53 2011
@@ -51,7 +51,8 @@ Multicaster::Multicaster(Cpg& cpg_,
     queue.start();
 }
 
-void Multicaster::mcast(const framing::AMQDataBlock& data) {
+void Multicaster::mcast(const framing::AMQFrame& data) {
+    QPID_LOG(trace, "cluster multicast: " << data);
     BufferRef bufRef = buffers.get(data.encodedSize());
     framing::Buffer buf(bufRef.begin(), bufRef.size());
     data.encode(buf);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.h Fri Sep 16 20:16:53 2011
@@ -30,7 +30,7 @@
 namespace qpid {
 
 namespace framing {
-class AMQDataBlock;
+class AMQFrame;
 class AMQBody;
 }
 
@@ -54,7 +54,7 @@ class Multicaster
     );
 
     /** Multicast an event */
-    void mcast(const framing::AMQDataBlock&);
+    void mcast(const framing::AMQFrame&);
     void mcast(const framing::AMQBody&);
 
   private:

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h?rev=1171756&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h Fri Sep 16 20:16:53 2011
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_EXP_PRETTYID_H
+#define QPID_CLUSTER_EXP_PRETTYID_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/cluster/types.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Wrapper for a MemberId that prints as the member ID or the string
+ * "self" if the member is self.
+ */
+struct  PrettyId {
+    MemberId id, self;
+    PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
+};
+
+inline std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
+    if (id.id == id.self) return o << "self";
+    else return o << id.id;
+}
+
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_PRETTYID_H*/

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/PrettyId.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Fri Sep 16 20:16:53 2011
@@ -20,13 +20,16 @@
  */
 
 #include "QueueContext.h"
+
 #include "Multicaster.h"
+#include "BrokerContext.h"      // for ScopedSuppressReplication
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/ClusterQueueResubscribeBody.h"
 #include "qpid/framing/ClusterQueueSubscribeBody.h"
 #include "qpid/framing/ClusterQueueUnsubscribeBody.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Timer.h"
 
@@ -41,7 +44,6 @@ class OwnershipTimeout : public sys::Tim
     OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
         TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
 
-    // FIXME aconway 2011-07-27: thread safety on deletion?
     void fire() { queueContext.timeout(); }
 };
 
@@ -49,32 +51,35 @@ QueueContext::QueueContext(broker::Queue
     : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
 {
     q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
-    q.stop();  // Initially stopped.
 }
 
 QueueContext::~QueueContext() {
-    // FIXME aconway 2011-07-27: revisit shutdown logic.
-    // timeout() could be called concurrently with destructor.
-    sys::Mutex::ScopedLock l(lock);
     if (timerTask) timerTask->cancel();
 }
 
+void QueueContext::cancelTimer(const sys::Mutex::ScopedLock&) {
+    if (timerTask) {        // no need for timeout, sole owner.
+        timerTask->cancel();
+        timerTask = 0;
+    }
+}
+
+// Called by QueueReplica in CPG deliver thread when state changes.
 void QueueContext::replicaState(QueueOwnership state) {
     sys::Mutex::ScopedLock l(lock);
     switch (state) {
       case UNSUBSCRIBED:
       case SUBSCRIBED:
+        cancelTimer(l);
+        queue.stopConsumers();
         break;
       case SOLE_OWNER:
-        queue.start();
-        if (timerTask) {        // no need for timeout.
-            timerTask->cancel();
-            timerTask = 0;
-        }
+        cancelTimer(l);         // Sole owner, no need for timer.
+        queue.startConsumers();
         break;
       case SHARED_OWNER:
-        queue.start();
-        if (timerTask) timerTask->cancel();
+        cancelTimer(l);
+        queue.startConsumers();
         // FIXME aconway 2011-07-28: configurable interval.
         timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
         timer.add(timerTask);
@@ -82,7 +87,7 @@ void QueueContext::replicaState(QueueOwn
     }
 }
 
-// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+// FIXME aconway 2011-07-27: Dont spin token on an empty queue.
 
 // Called in connection threads when a consumer is added
 void QueueContext::consume(size_t n) {
@@ -96,18 +101,19 @@ void QueueContext::consume(size_t n) {
 void QueueContext::cancel(size_t n) {
     sys::Mutex::ScopedLock l(lock);
     consumers = n;
-    if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock?
+    // When consuming threads are stopped, this->stopped will be called.
+    if (n == 0) queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
 }
 
+// Called in timer thread.
 void QueueContext::timeout() {
-    QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
-    queue.stop();
+    // FIXME aconway 2011-09-14: need to deal with stray timeouts.
+    queue.stopConsumers();
     // When all threads have stopped, queue will call stopped()
 }
 
-
-// Callback set up by queue.stop(), called when no threads are dispatching from the queue.
-// Release the queue.
+// Callback set up by queue.stopConsumers() called in connection thread.
+// Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
     sys::Mutex::ScopedLock l(lock);
     // FIXME aconway 2011-07-28: review thread safety of state.
@@ -116,16 +122,33 @@ void QueueContext::stopped() {
     if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
-    else
+    else                        // FIXME aconway 2011-09-13: check if we're owner?
         mcast.mcast(framing::ClusterQueueResubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));
 }
 
+void QueueContext::requeue(uint32_t position, bool redelivered) {
+    // FIXME aconway 2011-09-15: no lock, unacked has its own lock.
+    broker::QueuedMessage qm;
+    if (unacked.get(position, qm)) {
+        unacked.erase(position);
+        if (redelivered) qm.payload->redeliver();
+        BrokerContext::ScopedSuppressReplication ssr;
+        queue.requeue(qm);
+    }
+}
+
+void QueueContext::acquire(const broker::QueuedMessage& qm) {
+    unacked.put(qm.position, qm);
+}
+
+void QueueContext::dequeue(uint32_t position) {
+    unacked.erase(position);
+}
+
 boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
     return boost::intrusive_ptr<QueueContext>(
         static_cast<QueueContext*>(q.getClusterContext().get()));
 }
 
-
-
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Fri Sep 16 20:16:53 2011
@@ -22,7 +22,7 @@
  *
  */
 
-
+#include "LockedMap.h"
 #include <qpid/RefCounted.h>
 #include "qpid/sys/Time.h"
 #include <qpid/sys/Mutex.h>
@@ -35,6 +35,7 @@
 namespace qpid {
 namespace broker {
 class Queue;
+class QueuedMessage;
 }
 namespace sys {
 class Timer;
@@ -60,16 +61,16 @@ class QueueContext : public RefCounted {
     void replicaState(QueueOwnership);
 
     /** Called when queue is stopped, no threads are dispatching.
-     * Connection or deliver thread.
+     * May be called in connection or deliver thread.
      */
     void stopped();
 
-    /** Called when a consumer is added to the queue.
+    /** Called in connection thread when a consumer is added.
      *@param n: nubmer of consumers after new one is added.
      */
     void consume(size_t n);
 
-    /** Called when a consumer is cancelled on the queue.
+    /** Called in connection thread when a consumer is cancelled on the queue.
      *@param n: nubmer of consumers after the cancel.
      */
     void cancel(size_t n);
@@ -77,9 +78,18 @@ class QueueContext : public RefCounted {
     /** Get the context for a broker queue. */
     static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
 
-    /** Called when the timer runs out: stop the queue. */
+    /** Called in timer thread when the timer runs out. */
     void timeout();
 
+    /** Called by MessageHandler to requeue a message. */
+    void requeue(uint32_t position, bool redelivered);
+
+    /** Called by MessageHandler when a mesages is acquired. */
+    void acquire(const broker::QueuedMessage& qm);
+
+    /** Called by MesageHandler when a message is dequeued. */
+    void dequeue(uint32_t position);
+
   private:
     sys::Timer& timer;
 
@@ -89,7 +99,10 @@ class QueueContext : public RefCounted {
     boost::intrusive_ptr<sys::TimerTask> timerTask;
     size_t consumers;
 
-    // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
+    typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename
+    UnackedMap unacked;
+
+    void cancelTimer(const sys::Mutex::ScopedLock& l);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp Fri Sep 16 20:16:53 2011
@@ -58,6 +58,7 @@ void QueueHandler::left(const MemberId& 
 }
 
 // FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle?
+// FIXME aconway 2011-09-13: called from wiring handler, need to consider for multi-cpg.
 void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
     // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler.
     // FIXME aconway 2011-05-10: assert not already in map.

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Fri Sep 16 20:16:53 2011
@@ -20,6 +20,7 @@
  */
 #include "QueueReplica.h"
 #include "QueueContext.h"
+#include "PrettyId.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
 #include <algorithm>
@@ -30,17 +31,17 @@ namespace cluster {
 QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
                            const MemberId& self_)
     : queue(q), self(self_), context(QueueContext::get(*q))
-{
-    // q is initially stopped.
-}
+{}
 
 struct PrintSubscribers {
     const QueueReplica::MemberQueue& mq;
-    PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {}
+    MemberId self;
+    PrintSubscribers(const QueueReplica::MemberQueue& m, const MemberId& s) : mq(m), self(s) {}
 };
 
 std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
-    copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " "));
+    for (QueueReplica::MemberQueue::const_iterator i = ps.mq.begin();  i != ps.mq.end(); ++i)
+        o << PrettyId(*i, ps.self) << " ";
     return o;
 }
 
@@ -51,12 +52,10 @@ std::ostream& operator<<(std::ostream& o
 
 std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
     o << qr.queue->getName() << "(" << qr.getState() << "): "
-      <<  PrintSubscribers(qr.subscribers);
+      <<  PrintSubscribers(qr.subscribers, qr.getSelf());
     return o;
 }
 
-// FIXME aconway 2011-05-17: error handling for asserts.
-
 void QueueReplica::subscribe(const MemberId& member) {
     QueueOwnership before = getState();
     subscribers.push_back(member);
@@ -73,15 +72,16 @@ void QueueReplica::unsubscribe(const Mem
 }
 
 void QueueReplica::resubscribe(const MemberId& member) {
-    assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
-    QueueOwnership before = getState();
-    subscribers.pop_front();
-    subscribers.push_back(member);
-    update(before);
+    if (member == subscribers.front()) { // FIXME aconway 2011-09-13: should be assert?
+        QueueOwnership before = getState();
+        subscribers.pop_front();
+        subscribers.push_back(member);
+        update(before);
+    }
 }
 
 void QueueReplica::update(QueueOwnership before) {
-    QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
+    QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")");
     QueueOwnership after = getState();
     if (before == after) return;
     context->replicaState(after);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h Fri Sep 16 20:16:53 2011
@@ -36,6 +36,7 @@ class Queue;
 
 namespace cluster {
 class QueueContext;
+struct PrintSubscribers;
 
 /**
  * Queue state that is replicated among all cluster members.
@@ -54,12 +55,9 @@ class QueueReplica : public RefCounted
     void unsubscribe(const MemberId&);
     void resubscribe(const MemberId&);
 
+    MemberId getSelf() const { return self; }
+    
   private:
-
-  friend class PrintSubscribers;
-  friend std::ostream& operator<<(std::ostream&, QueueOwnership);
-  friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
-
     typedef std::deque<MemberId> MemberQueue;
 
     boost::shared_ptr<broker::Queue> queue;
@@ -71,6 +69,11 @@ class QueueReplica : public RefCounted
     bool isOwner() const;
     bool isSubscriber(const MemberId&) const;
     void update(QueueOwnership before);
+
+  friend struct PrintSubscribers;
+  friend std::ostream& operator<<(std::ostream&, QueueOwnership);
+  friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
+  friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h Fri Sep 16 20:16:53 2011
@@ -33,11 +33,12 @@ namespace sys {
  * An activity that may be executed by multiple threads, and can be stopped.
  *
  * Stopping prevents new threads from entering and calls a callback
- * when all busy threads leave.
+ * when all busy threads have left.
  */
 class Stoppable {
   public:
     /**
+     * Initially not stopped.
      *@param stoppedCallback: called when all threads have stopped.
      */
     Stoppable(boost::function<void()> stoppedCallback)
@@ -55,7 +56,7 @@ class Stoppable {
         Stoppable& state;
         bool entered;
       public:
-        Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+        Scope(Stoppable& s) : state(s) { entered = state.enter(); }
         ~Scope() { if (entered) state.exit(); }
         operator bool() const { return entered; }
     };
@@ -69,6 +70,7 @@ class Stoppable {
      */
     void stop() {
         sys::Monitor::ScopedLock l(lock);
+        if (stopped) return;
         stopped = true;
         check();
     }
@@ -81,6 +83,8 @@ class Stoppable {
         stopped = false;
     }
 
+  private:
+
     // Busy thread enters scope
     bool enter() {
         sys::Monitor::ScopedLock l(lock);
@@ -96,8 +100,8 @@ class Stoppable {
         check();
     }
 
-  private:
     void check() {
+        // Called with lock held.
         if (stopped && busy == 0 && notify) notify();
     }
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp Fri Sep 16 20:16:53 2011
@@ -94,8 +94,9 @@ class DummyCluster : public broker::Clus
     virtual void release(const broker::QueuedMessage& qm) {
         if (!isRouting) recordQm("release", qm);
     }
-    virtual void dequeue(const broker::QueuedMessage& qm) {
+    virtual bool dequeue(const broker::QueuedMessage& qm) {
         if (!isRouting) recordQm("dequeue", qm);
+        return false;
     }
 
     // Consumers

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/cluster2_tests.py Fri Sep 16 20:16:53 2011
@@ -159,7 +159,7 @@ class Cluster2Tests(BrokerTest):
                         self.session.acknowledge()
                 except Empty: pass
 
-        cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+        cluster = self.cluster(3, cluster2=True)
         connections = [ b.connect() for  b in cluster]
         sessions = [ c.session() for c in connections ]
         sender = sessions[0].sender("q;{create:always}")

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cpp-benchmark Fri Sep 16 20:16:53 2011
@@ -115,7 +115,9 @@ def start_receive(queue, index, opts, re
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
+    # FIXME aconway 2011-09-15: 
+    # return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
+    return clients.add(Popen(command, stdout=PIPE))
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -128,7 +130,9 @@ def start_send(queue, opts, broker, host
                "--report-total",
                "--report-header=no",
                "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
-               "--sequence=no",
+               # FIXME aconway 2011-09-15: 
+               # "--sequence=no",
+               "--sequence=yes",
                "--flow-control", str(opts.flow_control),
                "--durable", str(opts.durable)
                ]
@@ -166,12 +170,12 @@ def recreate_queues(queues, brokers):
     for q in queues:
         try: s.sender("%s;{delete:always}"%(q)).close()
         except qpid.messaging.exceptions.NotFound: pass
-        # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+        # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
         for b in brokers:
             while queue_exists(q,b): time.sleep(0.1);
-    for q in queues:
-        s.sender("%s;{create:always}"%q)
-        # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+        for q in queues:
+            s.sender("%s;{create:always}"%q)
+        # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
         for b in brokers:
             while not queue_exists(q,b): time.sleep(0.1);
     c.close()
@@ -182,8 +186,6 @@ def print_header(timestamp):
     print "send-tp\t\trecv-tp%s"%latency_header
 
 def parse(parser, lines):               # Parse sender/receiver output
-    for l in lines:
-        fn_val = zip(parser, l)
     return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
 
 def parse_senders(senders):

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp Fri Sep 16 20:16:53 2011
@@ -198,6 +198,7 @@ int main(int argc, char ** argv)
             std::map<std::string,Sender> replyTo;
 
             while (!done && receiver.fetch(msg, timeout)) {
+                cerr << "FIXME " << msg.getProperties()[SN] << endl;
                 if (!started) {
                     // Start the time on receipt of the first message to avoid counting
                     // idle time at process startup.
@@ -207,6 +208,7 @@ int main(int argc, char ** argv)
                 reporter.message(msg);
                 if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
                     if (msg.getContent() == EOS) {
+                        cerr << "FIXME eos" << endl;
                         done = true;
                     } else {
                         ++count;
@@ -224,7 +226,10 @@ int main(int argc, char ** argv)
                         }
                         if (opts.printContent)
                             std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
-                        if (opts.messages && count >= opts.messages) done = true;
+                        if (opts.messages && count >= opts.messages) {
+                            cerr << "FIXME "<< count << " >= " << opts.messages << endl;
+                            done = true;
+                        }
                     }
                 } else if (opts.checkRedelivered && !msg.getRedelivered()) {
                     throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");

Modified: qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml?rev=1171756&r1=1171755&r2=1171756&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml Fri Sep 16 20:16:53 2011
@@ -361,6 +361,7 @@
     <control name="release" code="0x6">
       <field name="queue" type="queue.name"/>
       <field name="position" type="uint32"/>
+      <field name="redelivered" type="bit"/>
     </control>
 
   </class>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org