You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/02 22:03:24 UTC

svn commit: r1209690 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Fri Dec  2 21:03:23 2011
New Revision: 1209690

URL: http://svn.apache.org/viewvc?rev=1209690&view=rev
Log:
QPID-3603: Fix replication of dequeues.

- Set acquire=false when creating a ReplicatingSubscription.
- Cleaned up string literals & other cosmetic improvemets.
- Consistent find/get for broker::QueueRegistry and ExchangeRegistry.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Dec  2 21:03:23 2011
@@ -87,12 +87,19 @@ void ExchangeRegistry::destroy(const str
     }
 }
 
-Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+Exchange::shared_ptr ExchangeRegistry::find(const string& name){
     RWlock::ScopedRlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i == exchanges.end())
-        throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
-    return i->second;
+        return Exchange::shared_ptr();
+    else
+        return i->second;
+}
+
+Exchange::shared_ptr ExchangeRegistry::get(const string& name) {
+    Exchange::shared_ptr ex = find(name);
+    if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: "<<name));
+    return ex;
 }
 
 bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Fri Dec  2 21:03:23 2011
@@ -54,10 +54,20 @@ class ExchangeRegistry{
        bool durable,
        const qpid::framing::FieldTable& args = framing::FieldTable());
     QPID_BROKER_EXTERN void destroy(const std::string& name);
-    QPID_BROKER_EXTERN Exchange::shared_ptr get(const std::string& name);
     Exchange::shared_ptr getDefault();
 
     /**
+     * Find the named exchange. Return 0 if not found.
+     */
+    QPID_BROKER_EXTERN boost::shared_ptr<Exchange> find(const std::string& name);
+
+    /**
+     * Get the named exchange. Throw exception if not found.
+     */
+    QPID_BROKER_EXTERN boost::shared_ptr<Exchange> get(const std::string& name);
+
+
+    /**
      * Register the manageable parent for declared exchanges
      */
     void setParent (management::Manageable* _parent) { parent = _parent; }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Dec  2 21:03:23 2011
@@ -23,6 +23,7 @@
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
 #include <sstream>
 #include <assert.h>
 
@@ -84,7 +85,6 @@ void QueueRegistry::destroy (const strin
 Queue::shared_ptr QueueRegistry::find(const string& name){
     RWlock::ScopedRlock locker(lock);
     QueueMap::iterator i = queues.find(name);
-    
     if (i == queues.end()) {
         return Queue::shared_ptr();
     } else {
@@ -92,6 +92,12 @@ Queue::shared_ptr QueueRegistry::find(co
     }
 }
 
+Queue::shared_ptr QueueRegistry::get(const string& name) {
+    Queue::shared_ptr q = find(name);
+    if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+    return q;
+}
+
 string QueueRegistry::generateName(){
     string name;
     do {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/QueueRegistry.h Fri Dec  2 21:03:23 2011
@@ -97,6 +97,11 @@ class QueueRegistry {
     QPID_BROKER_EXTERN boost::shared_ptr<Queue> find(const std::string& name);
 
     /**
+     * Get the named queue. Throw exception if not found.
+     */
+    QPID_BROKER_EXTERN boost::shared_ptr<Queue> get(const std::string& name);
+
+    /**
      * Generate unique queue name.
      */
     std::string generateName();

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Dec  2 21:03:23 2011
@@ -35,12 +35,16 @@
 
 namespace {
 const std::string QPID_REPLICATOR_("qpid.replicator-");
+const std::string TYPE_NAME("qpid.queue-replicator");
 }
 
 namespace qpid {
 namespace ha {
 using namespace broker;
 
+// FIXME aconway 2011-12-02: separate file for string constantS?
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
     : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
       queue(q), link(l), current(queue->getPosition())
@@ -72,9 +76,7 @@ void QueueReplicator::initializeBridge(B
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     framing::FieldTable settings;
-    // FIXME aconway 2011-11-28: string constants.
     settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
-    // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
     settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
     qpid::framing::SequenceNumber oldest;
     if (queue->getOldest(oldest))
@@ -86,15 +88,9 @@ void QueueReplicator::initializeBridge(B
     QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
 }
 
-
-namespace {
-const std::string DEQUEUE_EVENT("dequeue-event");
-const std::string REPLICATOR("qpid.replicator-");
-}
-
 void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
 {
-    if (key == DEQUEUE_EVENT) {
+    if (key == DEQUEUE_EVENT_KEY) {
         std::string content;
         msg.getMessage().getFrames().getContent(content);
         qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
@@ -115,6 +111,7 @@ void QueueReplicator::route(Deliverable&
                     QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
                 } else {
                     // FIXME aconway 2011-11-29: error handling
+                    // Is this an error? Will happen if queue has initial dequeues.
                     QPID_LOG(error, "HA: Unable to dequeue message at "
                              << QueuePos(queue.get(), *i));
                 }
@@ -136,10 +133,6 @@ void QueueReplicator::route(Deliverable&
 bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
 bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
 bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-// FIXME aconway 2011-11-28: rationalise string constants.
-static const std::string TYPE_NAME("qpid.queue-replicator");
-
 std::string QueueReplicator::getType() const { return TYPE_NAME; }
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Dec  2 21:03:23 2011
@@ -49,6 +49,8 @@ namespace ha {
 class QueueReplicator : public broker::Exchange
 {
   public:
+    static const std::string DEQUEUE_EVENT_KEY;
+    
     QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
     ~QueueReplicator();
     std::string getType() const;

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Dec  2 21:03:23 2011
@@ -36,11 +36,13 @@ using namespace std;
 // FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
 // Do we want a common HA prefix?
 const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
 
+namespace {
 const string DOLLAR("$");
 const string INTERNAL("-internal");
+} // namespace
 
 class ReplicationStateInitialiser
 {
@@ -80,15 +82,23 @@ ReplicatingSubscription::Factory::create
     const string& name,
     Queue::shared_ptr queue,
     bool ack,
-    bool acquire,
+    bool /*acquire*/,
     bool exclusive,
     const string& tag,
     const string& resumeId,
     uint64_t resumeTtl,
     const framing::FieldTable& arguments
 ) {
-    return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
-        new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+    boost::shared_ptr<ReplicatingSubscription> rs;
+    if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+        // FIXME aconway 2011-12-01: ignoring acquire param and setting acquire
+        // false. Should this be done in the caller? Remove from ctor parameters.
+        rs.reset(new ReplicatingSubscription(
+                     parent, name, queue, ack, false, exclusive, tag,
+                     resumeId, resumeTtl, arguments));
+        queue->addObserver(rs);
+    }
+    return rs;
 }
 
 ReplicatingSubscription::ReplicatingSubscription(
@@ -108,12 +118,11 @@ ReplicatingSubscription::ReplicatingSubs
     consumer(new DelegatingConsumer(*this))
 {
     QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
-    // FIXME aconway 2011-11-25: string constants.
-    if (arguments.isSet("qpid.high_sequence_number")) {
-        qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
+    if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
+        qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
         qpid::framing::SequenceNumber lwm;
-        if (arguments.isSet("qpid.low_sequence_number")) {
-            lwm = arguments.getAsInt("qpid.low_sequence_number");
+        if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) {
+            lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER);
         } else {
             lwm = hwm;
         }
@@ -159,6 +168,7 @@ void ReplicatingSubscription::enqueued(c
     m.payload->getIngressCompletion().startCompleter();
 }
 
+// Called with lock held.
 void ReplicatingSubscription::generateDequeueEvent()
 {
     string buf(range.encodedSize(),'\0');
@@ -186,11 +196,14 @@ void ReplicatingSubscription::generateDe
     event->getFrames().append(content);
 
     DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    props->setRoutingKey("dequeue-event");
-
+    props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY);
     events->deliver(event);
 }
 
+// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this?
+// If a queue is drained with no new messages coming on 
+// will the messages be dequeued on the backup?
+
 //called after the message has been removed from the deque and under
 //the message lock in the queue
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Dec  2 21:03:23 2011
@@ -22,6 +22,7 @@
  *
  */
 
+#include "QueueReplicator.h"    // For DEQUEUE_EVENT_KEY
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/ConsumerFactory.h"

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Dec  2 21:03:23 2011
@@ -109,7 +109,6 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-// FIXME aconway 2011-11-24: this should be a class.
 enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
 const string S_NONE="none";
 const string S_WIRING="wiring";
@@ -216,6 +215,7 @@ void WiringReplicator::initializeBridge(
     QPID_LOG(debug, "HA: Activated wiring replicator")
 }
 
+// FIXME aconway 2011-12-02: error  handling in route. Be forging but log warnings?
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
     Variant::List list;
     try {
@@ -329,7 +329,7 @@ void WiringReplicator::doEventExchangeDe
 void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
     string name = values[EXNAME].asString();
     try {
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
         if (exchange && replicateLevel(exchange->getArgs())) {
             QPID_LOG(debug, "HA: Deleting exchange:" << name);
             broker.deleteExchange(
@@ -341,20 +341,23 @@ void WiringReplicator::doEventExchangeDe
 }
 
 void WiringReplicator::doEventBind(Variant::Map& values) {
-    try {
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
-        boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
-        // We only replicated a binds for a replicated queue to replicated exchange.
-        if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
-            framing::FieldTable args;
-            amqp_0_10::translate(values[ARGS].asMap(), args);
-            string key = values[KEY].asString();
-            QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
-                     << " queue=" << queue->getName()
-                     << " key=" << key);
-            exchange->bind(queue, key, &args);
-        }
-    } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange.
+    boost::shared_ptr<Exchange> exchange =
+        broker.getExchanges().find(values[EXNAME].asString());
+    boost::shared_ptr<Queue> queue =
+        broker.getQueues().find(values[QNAME].asString());
+    // We only replicate binds for a replicated queue to replicated
+    // exchange that both exist locally.
+    if (exchange && replicateLevel(exchange->getArgs()) &&
+        queue && replicateLevel(queue->getSettings()))
+    {
+        framing::FieldTable args;
+        amqp_0_10::translate(values[ARGS].asMap(), args);
+        string key = values[KEY].asString();
+        QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+                 << " queue=" << queue->getName()
+                 << " key=" << key);
+        exchange->bind(queue, key, &args);
+    }
 }
 
 void WiringReplicator::doResponseQueue(Variant::Map& values) {
@@ -424,26 +427,24 @@ const std::string QUEUE_REF("queueRef");
 } // namespace
 
 void WiringReplicator::doResponseBind(Variant::Map& values) {
-    try {
-        std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
-        std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
-        boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
-        // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
-
-        // Automatically replicate exchange if queue and exchange are replicated
-        if (exchange && replicateLevel(exchange->getArgs()) &&
-            queue && replicateLevel(queue->getSettings()))
-        {
-            framing::FieldTable args;
-            amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-            string key = values[KEY].asString();
-            QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
-                     << " queue=" << queue->getName()
-                     << " key=" << key);
-            exchange->bind(queue, key, &args);
-        }
-    } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
+    std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+    std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
+    boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+    // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+    // Automatically replicate binding if queue and exchange exist and are replicated
+    if (exchange && replicateLevel(exchange->getArgs()) &&
+        queue && replicateLevel(queue->getSettings()))
+    {
+        framing::FieldTable args;
+        amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+        string key = values[KEY].asString();
+        QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+                 << " queue=" << queue->getName()
+                 << " key=" << key);
+        exchange->bind(queue, key, &args);
+    }
 }
 
 void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1209690&r1=1209689&r2=1209690&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Fri Dec  2 21:03:23 2011
@@ -62,7 +62,11 @@ class ShortTests(BrokerTest):
             return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
         def setup(p, prefix):
             """Create config, send messages on the primary p"""
-            p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+            s = p.sender(queue(prefix+"q1", "all"))
+            for m in ["a", "b", "1"]: s.send(Message(m))
+            # Test replication of dequeue
+            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") 
+            p.acknowledge()
             p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
             p.sender(queue(prefix+"q3", "none")).send(Message("3"))
             p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
@@ -70,13 +74,18 @@ class ShortTests(BrokerTest):
             # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
             p.sender(queue(prefix+"x", "wiring"))
 
-        def verify(b, prefix):
+        def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
+
             # FIXME aconway 2011-11-21: wait for wiring to replicate.
             self.wait(b, prefix+"x");
-            # Verify backup
             # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
-            self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+            self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
+
+            # FIXME aconway 2011-12-02: 
+            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") 
+            p.acknowledge()
+
             self.assert_browse_retry(b, prefix+"q2", []) # wiring only
             self.assert_missing(b, prefix+"q3")
             b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
@@ -84,18 +93,18 @@ class ShortTests(BrokerTest):
             b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
-        # Create config, send messages before starting the backup, to test catch-up replication.
         primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
         p = primary.connect().session()
+        # Create config, send messages before starting the backup, to test catch-up replication.
         setup(p, "1")
-        # Start the backup
         backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
-        b = backup.connect().session()
-        verify(b, "1")
-
         # Create config, send messages after starting the backup, to test steady-state replication.
         setup(p, "2")
-        verify(b, "2")
+
+        # Verify the data on the backup
+        b = backup.connect().session()
+        verify(b, "1", p)
+        verify(b, "2", p)
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org