You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/10/18 21:36:13 UTC

svn commit: r1023966 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ tests/

Author: aconway
Date: Mon Oct 18 19:36:13 2010
New Revision: 1023966

URL: http://svn.apache.org/viewvc?rev=1023966&view=rev
Log:
Introduce broker::Cluster interface.

See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt.

qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies
the broker makes the expected calls on broker::Cluster in various situations.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt   (with props)
    qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 18 19:36:13 2010
@@ -501,6 +501,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Broker.cpp \
   qpid/broker/Broker.h \
   qpid/broker/BrokerImportExport.h \
+  qpid/broker/Cluster.h \
   qpid/broker/Connection.cpp \
   qpid/broker/Connection.h \
   qpid/broker/ConnectionFactory.cpp \
@@ -559,6 +560,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/MessageStoreModule.h \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
+  qpid/broker/NullCluster.h \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/NullMessageStore.h \
   qpid/broker/OwnershipToken.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 18 19:36:13 2010
@@ -24,6 +24,7 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/broker/SaslAuthenticator.h"
@@ -146,6 +147,7 @@ Broker::Broker(const Broker::Options& co
                                                           conf.qmf2Support)
                                     : 0),
     store(new NullMessageStore),
+    cluster(new NullCluster),
     acl(0),
     dataDir(conf.noDataDir ? std::string() : conf.dataDir),
     queues(this),

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Oct 18 19:36:13 2010
@@ -70,6 +70,7 @@ namespace broker {
 
 class ExpiryPolicy;
 class Message;
+class Cluster;
 
 static const  uint16_t DEFAULT_PORT=5672;
 
@@ -153,6 +154,7 @@ public:
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
     std::auto_ptr<MessageStore> store;
+    std::auto_ptr<Cluster> cluster;
     AclModule* acl;
     DataDir dataDir;
 
@@ -273,6 +275,9 @@ public:
     void setClusterUpdatee(bool set) { clusterUpdatee = set; }
     bool isClusterUpdatee() const { return clusterUpdatee; }
 
+    QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+    QPID_BROKER_EXTERN Cluster& getCluster() { return *cluster; }
+
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
     
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h Mon Oct 18 19:36:13 2010
@@ -0,0 +1,103 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+  public:
+    virtual ~Cluster() {}
+
+    // Messages
+
+    /** In Exchange::route, before the message is enqueued. */
+    virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+    /** A message is delivered to a queue. */
+    virtual void enqueue(QueuedMessage&) = 0;
+    /** In Exchange::route, after all enqueues for the message. */
+    virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+    /** A message is acquired by a local consumer, it is unavailable to replicas. */
+    virtual void acquire(const QueuedMessage&) = 0;
+    /** A locally-acquired message is accepted, it is removed from all replicas. */
+    virtual void accept(const QueuedMessage&) = 0;
+
+    /** A locally-acquired message is rejected, and may be re-routed. */
+    virtual void reject(const QueuedMessage&) = 0;
+    /** Re-routing (if any) is complete for a rejected message. */
+    virtual void rejected(const QueuedMessage&) = 0;
+
+    /** A locally-acquired message is released by the consumer and re-queued. */
+    virtual void release(const QueuedMessage&) = 0;
+    /** A message is dropped from the queue, e.g. expired or replaced on an LVQ.
+     * This function does only local book-keeping, it does not multicast.
+     * It is reasonable to call with a queue lock held.
+     */
+    virtual void dequeue(const QueuedMessage&) = 0;
+
+    // Consumers
+
+    /** A new consumer subscribes to a queue. */
+    virtual void consume(const Queue&, size_t consumerCount) = 0;
+    /** A consumer cancels its subscription to a queue */
+    virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+    // Wiring
+
+    /** A queue is created */
+    virtual void create(const Queue&) = 0;
+    /** A queue is destroyed */
+    virtual void destroy(const Queue&) = 0;
+    /** An exchange is created */
+    virtual void create(const Exchange&) = 0;
+    /** An exchange is destroyed */
+    virtual void destroy(const Exchange&) = 0;
+    /** A binding is created */
+    virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CLUSTER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Oct 18 19:36:13 2010
@@ -112,7 +112,7 @@ void DeliveryRecord::complete()  {
 
 bool DeliveryRecord::accept(TransactionContext* ctxt) {
     if (acquired && !ended) {
-        queue->dequeue(ctxt, msg);
+        queue->accept(ctxt, msg);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
     }
@@ -130,19 +130,8 @@ void DeliveryRecord::committed() const{
 }
 
 void DeliveryRecord::reject() 
-{    
-    Exchange::shared_ptr alternate = queue->getAlternateExchange();
-    if (alternate) {
-        DeliverableMessage delivery(msg.payload);
-        alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
-        QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " 
-                 << alternate->getName());
-    } else {
-        //just drop it
-        QPID_LOG(info, "Dropping rejected message from " << queue->getName());
-    }
-
-    dequeue();
+{
+    queue->reject(msg);
 }
 
 uint32_t DeliveryRecord::getCredit() const
@@ -156,7 +145,7 @@ void DeliveryRecord::acquire(DeliveryIds
         results.push_back(id);
         if (!acceptExpected) {
             if (ended) { QPID_LOG(error, "Can't dequeue ended message"); }
-            else { queue->dequeue(0, msg); setEnded(); }
+            else { queue->accept(0, msg); setEnded(); }
         }
     } else {
         QPID_LOG(info, "Message already acquired " << id.getValue());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 18 19:36:13 2010
@@ -22,6 +22,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
@@ -78,10 +79,23 @@ Exchange::PreRoute::~PreRoute(){
     }
 }
 
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+    Broker* broker;
+    boost::intrusive_ptr<Message> message;
+    ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+        : broker(b), message(m) {
+        if (broker) broker->getCluster().routing(message);
+    }
+    ~ScopedClusterRouting() {
+        if (broker) broker->getCluster().routed(message);
+    }
+};
+
 void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
 {
+    ScopedClusterRouting scr(broker, &msg.getMessage());
     int count = 0;
-
     if (b.get()) {
         // Block the content release if the message is transient AND there is more than one binding
         if (!msg.getMessage().isPersistent() && b->size() > 1) {

Added: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h Mon Oct 18 19:36:13 2010
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+  public:
+
+    // Messages
+
+    virtual void routing(const boost::intrusive_ptr<Message>&) {}
+    virtual void enqueue(QueuedMessage&) {}
+    virtual void routed(const boost::intrusive_ptr<Message>&) {}
+    virtual void acquire(const QueuedMessage&) {}
+    virtual void accept(const QueuedMessage&) {}
+    virtual void reject(const QueuedMessage&) {}
+    virtual void rejected(const QueuedMessage&) {}
+    virtual void release(const QueuedMessage&) {}
+    virtual void dequeue(const QueuedMessage&) {}
+
+    // Consumers
+
+    virtual void consume(const Queue&, size_t) {}
+    virtual void cancel(const Queue&, size_t) {}
+
+    // Wiring
+
+    virtual void create(const Queue&) {}
+    virtual void destroy(const Queue&) {}
+    virtual void create(const Exchange&) {}
+    virtual void destroy(const Exchange&) {}
+    virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 18 19:36:13 2010
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
@@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage&
             }
         }
     }
+    if (broker) broker->getCluster().release(msg);
     copy.notify();
 }
 
@@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMe
     }
 }
 
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. The calling function should set qmsg
+// to the acquired message.
+struct ClusterAcquireOnExit {
+    Broker* broker;
+    QueuedMessage qmsg;
+    ClusterAcquireOnExit(Broker* b) : broker(b) {}
+    ~ClusterAcquireOnExit() {
+        if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+    }
+};
+
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
 {
+    ClusterAcquireOnExit willAcquire(broker);
+
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const Seque
         if (lastValueQueue) {
             clearLVQIndex(*i);
         }
-        QPID_LOG(debug,
-                 "Acquired message at " << i->position << " from " << name);
+        QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+        willAcquire.qmsg = *i;
         messages.erase(i);
         return true;
-    } 
+    }
     QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
     return false;
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
+    ClusterAcquireOnExit acquire(broker);
+
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
 
@@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage&
     Messages::iterator i = findAt(msg.position); 
     if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
         (!lastValueQueue ||
-        (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
-        )  {
+         (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+    )  {
 
         clearLVQIndex(msg);
         QPID_LOG(debug,
                  "Match found, acquire succeeded: " <<
                  i->position << " == " << msg.position);
+        acquire.qmsg = *i;
         messages.erase(i);
         return true;
-    } 
+    }
     
     QPID_LOG(debug, "Acquire failed for " << msg.position);
     return false;
@@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
+        ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+
         Mutex::ScopedLock locker(messageLock);
         if (messages.empty()) { 
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {            
                     m = msg;
+                    willAcquire.qmsg = msg;
                     popMsg(msg);
                     return CONSUMED;
                 } else {
@@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber
     return QueuedMessage();
 }
 
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
     assertClusterSafe();
-    Mutex::ScopedLock locker(consumerLock);
-    if(exclusive) {
-        throw ResourceLockedException(
-            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-    } else if(requestExclusive) {
-        if(consumerCount) {
+    size_t consumers;
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        if(exclusive) {
             throw ResourceLockedException(
-                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-        } else {
-            exclusive = c->getSession();
+                QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+        } else if(requestExclusive) {
+            if(consumerCount) {
+                throw ResourceLockedException(
+                    QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+            } else {
+                exclusive = c->getSession();
+            }
         }
+        consumers = ++consumerCount;
+        if (mgmtObject != 0)
+            mgmtObject->inc_consumerCount ();
     }
-    consumerCount++;
-    if (mgmtObject != 0)
-        mgmtObject->inc_consumerCount ();
+    if (broker) broker->getCluster().consume(*this, consumers);
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
-    Mutex::ScopedLock locker(consumerLock);
-    consumerCount--;
-    if(exclusive) exclusive = 0;
-    if (mgmtObject != 0)
-        mgmtObject->dec_consumerCount ();
+    size_t consumers;
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        consumers = --consumerCount;
+        if(exclusive) exclusive = 0;
+        if (mgmtObject != 0)
+            mgmtObject->dec_consumerCount ();
+    }
+    if (broker) broker->getCluster().cancel(*this, consumers);
 }
 
 QueuedMessage Queue::get(){
+    ClusterAcquireOnExit acquire(broker); // Outside lock
+
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
 
     if(!messages.empty()){
         msg = getFront();
+        acquire.qmsg = msg;
         popMsg(msg);
     }
     return msg;
@@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg)
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
+    QueuedMessage qm;
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
-        QueuedMessage qm(this, msg, ++sequence);
+        qm = QueuedMessage(this, msg, ++sequence);
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
@@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Me
                 boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
                 if (!old) old = i->second;
                 i->second->setReplacementMessage(msg,this);
+                // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
+                // should be using the position of the message being replaced.
                 if (isRecovery) {
                     //can't issue new requests for the store until
                     //recovery is complete
                     pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
                 } else {
-                    Mutex::ScopedUnlock u(messageLock);   
+                    Mutex::ScopedUnlock u(messageLock);
                     dequeue(0, QueuedMessage(qm.queue, old, qm.position));
                 }
             }           
@@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Me
         }
     }
     copy.notify();
+    if (broker) broker->getCluster().enqueue(qm);
 }
 
 QueuedMessage Queue::getFront()
@@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrus
     if (policy.get()) policy->enqueueAborted(msg);       
 }
 
+void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
+    if (broker) broker->getCluster().accept(msg);
+    dequeue(ctxt, msg);
+}
+
+struct ScopedClusterReject {
+    Broker* broker;
+    const QueuedMessage& qmsg;
+    ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
+        if (broker) broker->getCluster().reject(qmsg);
+    }
+    ~ScopedClusterReject() {
+        if (broker) broker->getCluster().rejected(qmsg);
+    }
+};
+
+void Queue::reject(const QueuedMessage &msg) {
+    ScopedClusterReject scr(broker, msg);
+    Exchange::shared_ptr alternate = getAlternateExchange();
+    if (alternate) {
+        DeliverableMessage delivery(msg.payload);
+        alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+        QPID_LOG(info, "Routed rejected message from " << getName() << " to " 
+                 << alternate->getName());
+    } else {
+        //just drop it
+        QPID_LOG(info, "Dropping rejected message from " << getName());
+    }
+    dequeue(0, msg);
+}
+
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
-
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
@@ -846,6 +913,9 @@ void Queue::popAndDequeue()
  */
 void Queue::dequeued(const QueuedMessage& msg)
 {
+    // Note: Cluster::dequeued does only local book-keeping, no multicast
+    // So OK to call here with lock held.
+    if (broker) broker->getCluster().dequeue(msg);
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
     if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _se
         store->create(*this, _settings);
     }
     configure(_settings);
+    if (broker) broker->getCluster().create(*this);
 }
 
 void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -934,6 +1005,7 @@ void Queue::destroy()
         store->destroy(*this);
         store = 0;//ensure we make no more calls to the store for this queue
     }
+    if (broker) broker->getCluster().destroy(*this);
 }
 
 void Queue::notifyDeleted()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 18 19:36:13 2010
@@ -259,6 +259,13 @@ class Queue : public boost::enable_share
 
     bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
     void enqueueAborted(boost::intrusive_ptr<Message> msg);
+
+    /** Message acknowledged, dequeue it. */
+    QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg);
+
+    /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */
+    QPID_BROKER_EXTERN void reject(const QueuedMessage &msg);
+    
     /**
      * dequeue from store (only done once messages is acknowledged)
      */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Mon Oct 18 19:36:13 2010
@@ -34,10 +34,9 @@ struct QueuedMessage
     framing::SequenceNumber position;
     Queue* queue;
 
-    QueuedMessage() : queue(0) {}
+    QueuedMessage(Queue* q=0) : position(0), queue(q) {}
     QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
         payload(msg), position(sn), queue(q) {}
-    QueuedMessage(Queue* q) : queue(q) {}
     
 };
     inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 18 19:36:13 2010
@@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::delive
         parent->record(record);
     } 
     if (acquire && !ackExpected) {
-        queue->dequeue(0, msg);
+        queue->accept(0, msg);
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
@@ -347,11 +347,6 @@ bool SemanticState::ConsumerImpl::filter
 bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
 {
     assertClusterSafe();
-    // FIXME aconway 2009-06-08: if we have byte & message credit but
-    // checkCredit fails because the message is to big, we should
-    // remain on queue's listener list for possible smaller messages
-    // in future.
-    // 
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt Mon Oct 18 19:36:13 2010
@@ -75,6 +75,8 @@ Use a moving queue ownership protocol to
 than relying on identical state and lock-step behavior to cause
 identical dequeues on each broker.
 
+Clearly defined interface between broker code and cluster plug-in.
+
 *** Requirements
 
 The cluster must provide these delivery guarantees: 
@@ -365,3 +367,4 @@ there a better term?
 Clustering and scalability: new design may give us the flexibility to
 address scalability as part of cluster design. Think about
 relationship to federation and "fragmented queues" idea.
+

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Mon Oct 18 19:36:13 2010
@@ -0,0 +1,439 @@
+-*-org-*-
+Notes on new cluster implementation. See also: new-cluster-design.txt
+
+* Implementation plan.
+  
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
+
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- measure performance.
+
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
+
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
+
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
+
+* Task list
+
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
+
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
+
+*** TODO broker::Cluster interface and call points.
+
+Initial draft is commited. 
+
+Issues to review:
+
+queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
+when messages are pushed. How to reconcile with queue ownership?
+
+rejecting messages: if there's an alternate exchange where do we do the
+re-routing? On origin broker or on all brokers?
+
+Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
+Intercepting client actions on the queue vs. internal actions
+(e.g. ring policy)
+
+*** Main classes
+
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
+
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and DeliverHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
+
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both DeliverHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; Position pos; }
+- SequenceNumber 64 bit sequence number to identify messages.
+- NodeId 64 bit CPG node-id, identifies member of the cluster.
+- struct MessageId { NodeId node; SequenceNumber seq; }
+
+Members:
+- atomic<SequenceNumber> sequence // sequence number for message IDs.
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+
+NOTE: localMessage is also modified by DeliverHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+  if noReplicate: return
+  # Supress everything except enqueues while we are routing.
+  # We don't want to replicate acquires & dequeues caused by an enqueu,
+  # e.g. removal of messages from ring/LV queues.
+  isRouting = true 
+
+enqueue(qmsg):
+  if noReplicate: return
+  if !qmsg.msg.id:
+    seq = sequence++
+    qmsg.msg.id = (self,seq)
+    localMessage[seq] = qmsg
+    mcast create(encode(qmsg.msg),seq)
+  mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+
+routed(msg):
+  if noReplicate: return
+  if msg.id: mcast routed(msg.id.seq)
+  isRouting = false
+
+acquire(qmsg):
+  if noReplicate: return
+  if isRouting: return # Ignore while we are routing a message.
+  if msg.id: mcast acquire(msg.id, q)
+
+release(QueuedMessage) 
+  if noReplicate: return
+  if isRouting: return # Ignore while we are routing a message.
+  if msg.id: mcast release(id, q)
+
+accept(QueuedMessage):
+  if noReplicate: return
+  if isRouting: return # Ignore while we are routing a message.
+  if msg.id: mcast dequeue(msg.id, msg.q)
+
+reject(QueuedMessage):
+  isRejecting = true
+  if msg.id: mcast reject(msg.id, msg.q)
+
+rejected(QueuedMessage):
+  isRejecting = false
+  mcast dequeue(msg.id, msg.q)
+
+dequeue(QueuedMessage) 
+  # No mcast in dequeue, only used for local cleanup of resources. 
+  # E.g. messages that are replaced on an LVQ are dequeued without being
+  # accepted or rejected. dequeue is called with the queue lock held
+  # FIXME revisit - move it out of the queue lock.
+  cleanup(msg)
+
+*** DeliverHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+  if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+  id = (sender,seq)
+  if sender == self:
+    enqueued[id,q] = (localMessage[seq], acquired=None)
+  else:
+    msg = sender.routing[seq]
+    enqueued[id,q] = (qmsg, acquired=None)
+    with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+  if sender == self: localMessage.erase(msg.id.seq)
+  else: sender.routing.erase(seq)
+
+acquire(id,q):
+  enqueued[id,q].acquired = sender
+  node[sender].acquired.push_back((id,q))
+  if sender != self:
+    with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+  enqueued[id,q].acquired = None
+  node[sender].acquired.erase((id,q))
+  if sender != self
+    with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+  sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+  sender.routing.erase[id]
+
+dequeue(id,q)
+  entry = enqueued[id,q]
+  enqueued.erase[id,q]
+  node[entry.acquired].acquired.erase(id,q)
+  if sender != self:
+    with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+  for key in node[m].acquired:
+   release(key.id, key.q)
+  node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+  consumersStopped = true;
+  while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+  consumersStopped = false;
+  listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+   if (consumersStopped) return false;
+   ++consumersBusy;
+   do_regular_dispatch_body()
+   if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+  bool owned;
+  Timer timer;
+  BrokerQueue q;
+
+  drop(): # locked
+    if owned:
+      owned = false
+      q.stopConsumers()
+      mcast release(q.name, false)
+      timer.stop()
+
+  take(): # locked
+    if not owned:
+      owned = true
+      q.startConsumers()
+      timer.start(timeout)
+
+  timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
+
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
+
+Multicast messages in QueueOwner class:
+
+consume(q):
+  if sender==self and consumers[q].empty(): ownership[q].take()
+  consumers[q].insert(sender)
+
+release(q):
+  asssert(owner[q] == sender and owner[q] in consumers[q])
+  owner[q] = circular search from sender in consumers[q]
+  if owner==self: ownership[q].take()
+
+cancel(q):
+  assert(queue[q].owner != sender) # sender must release() before cancel()
+  consumers[q].erase(sender)
+
+member-leaves:
+  for q in queue: if owner[q] = left: left.release(q)
+
+Need 2 more intercept points in broker::Cluster:
+
+consume(q,consumer,consumerCount) - Queue::consume()
+  if consumerCount == 1: mcast consume(q)
+
+cancel(q,consumer,consumerCount) - Queue::cancel()
+  if consumerCount == 0:
+    ownership[q].drop()
+  mcast cancel(q)
+
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
+
+** TODO [#B] Large message replication.
+Need to be able to multicast large messages in fragments
+
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
+
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+
+Async callback uses *requestIOProcessing* to queue action on IO thread.
+
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
+
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+
+** TODO [#B] New members joining - first pass
+
+Re-use update code from old cluster but don't replicate sessions &
+connections.
+
+Need to extend it to send cluster IDs with messages.
+
+Need to replicate the queue ownership data as part of the update.
+
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
+
+Only one broker recovers from store, update to others.
+
+Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
+
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
+Target broker may not have all messages on other brokers for purge/destroy.
+- Queue::move() - need to wait for lock? Replicate?
+- Queue::get() - ???
+- Queue::purge() - replicate purge? or just delete what's on broker ?
+- Queue::destroy() - messages to alternate exchange on all brokers.?
+
+Need to add callpoints & mcast messages to replicate these?
+
+** TODO [#B] Flow control for internal queues.
+   
+Need to bound the size of the internal queues holding cluster events & frames.
+- stop polling when we reach bound.
+- start polling when we get back under it.
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
+
+** TODO [#C] Allow non-replicated exchanges, queues.
+   
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
+Replicated queue: replicate all messages.
+
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages.  Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is  updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
+
+** TODO [#C] Refactoring of common concerns.
+
+There are a bunch of things that act as "Queue observers" with intercept
+points in similar places.
+- QueuePolicy
+- QueuedEvents (async replication)
+- MessageStore
+- Cluster
+
+Look for ways to capitalize on the similarity & simplify the code.
+
+In particular QueuedEvents (async replication) strongly resembles
+cluster replication, but over TCP rather than multicast.

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp Mon Oct 18 19:36:13 2010
@@ -0,0 +1,435 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+///@file
+// Tests using a dummy broker::Cluster implementation to verify the expected
+// Cluster functions are called for various actions on the broker.
+//
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Duration.h"
+#include "BrokerFixture.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+#include <boost/regex.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace boost::assign;
+using namespace qpid::messaging;
+using boost::format;
+using boost::regex;
+
+namespace qpid {
+namespace tests {
+
+class DummyCluster : public broker::Cluster
+{
+  private:
+    /** Flag used to ignore events other than enqueues while routing,
+     * e.g. acquires and accepts generated in a ring queue to replace an element..
+     * In real impl would be a thread-local variable.
+     */
+    bool isRouting;
+    
+    void recordQm(const string& op, const broker::QueuedMessage& qm) {
+        history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
+                    % qm.position % qm.payload->getFrames().getContent()).str();
+    }
+    void recordStr(const string& op, const string& name) {
+        history += (format("%s(%s)") % op % name).str();
+    }
+  public:
+    // Messages
+
+    virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
+        isRouting = true;
+        history += (format("routing(%s)") % m->getFrames().getContent()).str();
+    }
+
+    virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); }
+
+    virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
+        history += (format("routed(%s)") % m->getFrames().getContent()).str();
+        isRouting = false;
+    }
+    virtual void acquire(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("acquire", qm);
+    }
+    virtual void accept(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("accept", qm);
+    }
+    virtual void reject(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("reject", qm);
+    }
+    virtual void rejected(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("rejected", qm);
+    }
+    virtual void release(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("release", qm);
+    }
+    virtual void dequeue(const broker::QueuedMessage& qm) {
+        // Never ignore dequeue, used to avoid resource leaks.
+        recordQm("dequeue", qm);
+    }
+
+    // Consumers
+
+    virtual void consume(const broker::Queue& q, size_t n) {
+        history += (format("consume(%s, %d)") % q.getName() % n).str();
+    }
+    virtual void cancel(const broker::Queue& q, size_t n) {
+        history += (format("cancel(%s, %d)") % q.getName() % n).str();
+    }
+
+    // Wiring
+
+    virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
+    virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
+    virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
+    virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
+    virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+        history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
+    }
+    vector<string> history;
+};
+
+QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
+
+// Broker fixture with DummyCluster set up and some new API client bits.
+struct DummyClusterFixture: public BrokerFixture {
+    Connection c;
+    Session s;
+    DummyCluster*dc;
+    DummyClusterFixture() {
+        broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
+        dc = &static_cast<DummyCluster&>(broker->getCluster());
+        c = Connection("localhost:"+lexical_cast<string>(getPort()));
+        c.open();
+        s = c.createSession();
+    }
+    ~DummyClusterFixture() {
+        c.close();
+    }
+};
+
+QPID_AUTO_TEST_CASE(testSimplePubSub) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    // Queue creation
+    Sender sender = f.s.createSender("q;{create:always,delete:always}");
+    int i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Consumer
+    Receiver receiver = f.s.createReceiver("q");
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Send message
+    sender.send(Message("a"));
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    // Don't check size here as it is uncertain whether acquire has happened yet.
+
+    // Acquire message
+    Message m = receiver.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Acknowledge message
+    f.s.acknowledge(true);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Close a consumer
+    receiver.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Destroy the queue
+    f.c.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testReleaseReject) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
+    sender.send(Message("a"));
+    Receiver receiver = f.s.createReceiver("q");
+    Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
+    Message m = receiver.fetch(Duration::SECOND);
+    h.clear();
+
+    // Explicit release
+    f.s.release(m);
+    f.s.sync();
+    int i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Implicit release on closing connection.
+    Connection c("localhost:"+lexical_cast<string>(f.getPort()));
+    c.open();
+    Session s = c.createSession();
+    Receiver r = s.createReceiver("q");
+    m = r.fetch(Duration::SECOND);
+    h.clear();
+    i = 0;
+    c.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Reject message, goes to alternate exchange.
+    m = receiver.fetch(Duration::SECOND);
+    h.clear();
+    i = 0;
+    f.s.reject(m);
+    BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); 
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); 
+    BOOST_CHECK_EQUAL(h.size(), i);
+    m = altReceiver.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(m.getContent(), "a");
+
+    // Timed out message
+    h.clear();
+    i = 0;
+    m = Message("t");
+    m.setTtl(Duration(1));                // Timeout 1ms
+    sender.send(m);
+    usleep(2000);               // Sleep 2ms
+    bool received = receiver.fetch(m, Duration::IMMEDIATE);
+    BOOST_CHECK(!received);                     // Timed out
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Message replaced on LVQ
+    sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
+    m = Message("a");
+    m.getProperties()["qpid.LVQ_key"] = "foo";
+    sender.send(m);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    m = Message("b");
+    m.getProperties()["qpid.LVQ_key"] = "foo";
+    sender.send(m);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    // FIXME: bug in Queue.cpp gives the incorrect position when
+    // dequeueing a replaced LVQ message.
+    // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    receiver = f.s.createReceiver("lvq");
+    BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
+    f.s.acknowledge(true);
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testFanout) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
+    Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
+    Sender sender = f.s.createSender("amq.fanout");
+    r1.setCapacity(0);          // Don't receive immediately.
+    r2.setCapacity(0);
+    h.clear();
+    int i = 0;
+
+    // Send message
+    sender.send(Message("a"));
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++));
+    BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++));
+    BOOST_CHECK(h.at(i-1) != h.at(i-2));
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Receive messages
+    Message m1 = r1.fetch(Duration::SECOND);
+    f.s.acknowledge(m1, true);
+    Message m2 = r2.fetch(Duration::SECOND);
+    f.s.acknowledge(m2, true);
+
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testRingQueue) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
+    // so we can't do this:
+    // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
+    // Must use old API to declare ring queue:
+    qpid::client::Connection c;
+    f.open(c);
+    qpid::client::Session s = c.newSession();
+    qpid::framing::FieldTable args;
+    args.setInt("qpid.max_size", 3);
+    args.setString("qpid.policy_type","ring");
+    s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
+    c.close();
+    Sender sender = f.s.createSender("ring");
+    
+    int i = 0;
+    // Send message
+    sender.send(Message("a"));
+    sender.send(Message("b"));
+    sender.send(Message("c"));
+    sender.send(Message("d"));
+    f.s.sync();
+
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
+
+    Receiver receiver = f.s.createReceiver("ring");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
+    f.s.acknowledge(true);
+
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
+
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testTransactions) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+    Session ts = f.c.createTransactionalSession();
+    Sender sender = ts.createSender("q;{create:always,delete:always}");
+    int i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    sender.send(Message("a"));
+    sender.send(Message("b"));
+    ts.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+    BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
+    ts.commit();
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // FIXME aconway 2010-10-18: As things stand the cluster is not
+    // compatible with transactions
+    // - enqueues occur after routing is complete.
+    // - no transaction context associated with messages in the Cluster interface.
+    // - no call to Cluster::accept in Queue::dequeueCommitted
+
+    Receiver receiver = ts.createReceiver("q");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+    ts.acknowledge();
+    ts.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+    ts.commit();
+    ts.sync();
+    // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
+

Propchange: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Oct 18 19:36:13 2010
@@ -123,7 +123,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	Variant.cpp \
 	Address.cpp \
 	ClientMessage.cpp \
-	Qmf2.cpp
+	Qmf2.cpp \
+	BrokerClusterCalls.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Mon Oct 18 19:36:13 2010
@@ -77,7 +77,7 @@ cluster_test_SOURCES =				\
 	PartialFailure.cpp			\
 	ClusterFailover.cpp
 
-cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework -lboost_regex
 
 qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org