You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/21 23:34:51 UTC

svn commit: r1221920 - in /qpid/branches/qpid-3603/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Wed Dec 21 22:34:50 2011
New Revision: 1221920

URL: http://svn.apache.org/viewvc?rev=1221920&view=rev
Log:
QPID-3603: Clean up HA log messages.

- Reduce verbosity, drop unknown event messages.
- Lots of clarifications
- Fix minor test bug in ha_tests.py.

Removed:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Logging.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Logging.h
Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/ha.mk?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/ha.mk Wed Dec 21 22:34:50 2011
@@ -28,8 +28,6 @@ ha_la_SOURCES =					\
   qpid/ha/HaBroker.cpp				\
   qpid/ha/HaBroker.h				\
   qpid/ha/HaPlugin.cpp				\
-  qpid/ha/Logging.h				\
-  qpid/ha/Logging.cpp				\
   qpid/ha/Settings.h				\
   qpid/ha/QueueReplicator.h			\
   qpid/ha/QueueReplicator.cpp			\

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Wed Dec 21 22:34:50 2011
@@ -1323,7 +1323,7 @@ void Queue::query(qpid::types::Variant::
 void Queue::setPosition(SequenceNumber n) {
     Mutex::ScopedLock locker(messageLock);
     sequence = n;
-    QPID_LOG(info, "Set position to " << sequence << " on " << getName());
+    QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
 }
 
 SequenceNumber Queue::getPosition() {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Dec 21 22:34:50 2011
@@ -46,7 +46,7 @@ Url url(const std::string& s, const std:
 } // namespace
 
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : broker(b), 
+    : broker(b),
       clientUrl(url(s.clientUrl, "ha-client-url")),
       brokerUrl(url(s.brokerUrl, "ha-broker-url")),
       mgmtObject(0)
@@ -59,17 +59,17 @@ HaBroker::HaBroker(broker::Broker& b, co
         mgmtObject->set_status("solo");
         ma->addObject(mgmtObject);
     }
-    QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
-             << " broker-url=" << brokerUrl);
     // FIXME aconway 2011-11-22: temporary hack to identify primary.
-    if (s.brokerUrl != "primary")
-        backup.reset(new Backup(broker, s));
+    bool isPrimary = (s.brokerUrl == "primary");
+    QPID_LOG(notice, "HA: " << (isPrimary ? "Primary" : "Backup")
+             << " initialized: client-url=" << clientUrl
+             << " broker-url=" << brokerUrl);
+    if (!isPrimary) backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
 }
-
 HaBroker::~HaBroker() {}
 
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Dec 21 22:34:50 2011
@@ -21,7 +21,6 @@
 
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
-#include "Logging.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -32,6 +31,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
+#include <ostream>
 
 namespace {
 const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -47,12 +47,19 @@ using namespace framing;
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
 const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
 
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+    return QPID_REPLICATOR_ + queueName;
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) {
+    return o << "HA: Backup queue " << qr.queue->getName() << ": ";
+}
+
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
-    : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
-      queue(q), link(l)
+    : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
 {
-    QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
-    // Declare the replicator bridge.
+    QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+
     queue->getBroker()->getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -69,12 +76,15 @@ QueueReplicator::QueueReplicator(boost::
     );
 }
 
-QueueReplicator::~QueueReplicator() {}
+QueueReplicator::~QueueReplicator() {
+    // FIXME aconway 2011-12-21: causes race condition? Restore.
+//     queue->getBroker()->getLinks().destroy(
+//         link->getHost(), link->getPort(), queue->getName(), getName(), string());
+}
 
-// NB: This is called back ina broker connection thread when the
-// bridge is created.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
-    // No lock needed, no mutable member variables are used.
+// Called in a broker connection thread when the bridge is created.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
+{
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     framing::FieldTable settings;
@@ -91,11 +101,12 @@ void QueueReplicator::initializeBridge(B
     queue->setPosition(0);
 
     settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+    // TODO aconway 2011-12-19: optimize.
     settings.setInt(QPID_SYNC_FREQUENCY, 1);
     peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
+    QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
 }
 
 namespace {
@@ -115,34 +126,37 @@ void QueueReplicator::dequeue(SequenceNu
         QueuedMessage message;
         if (queue->acquireMessageAt(n, message)) {
             queue->dequeue(0, message);
-            QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
+            QPID_LOG(trace, *this << "Dequeued message "<< message.position);
         }
     }
 }
 
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable* /*args*/)
+// Called in connection thread of the queues bridge to primary.
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
 {
     sys::Mutex::ScopedLock l(lock);
     if (key == DEQUEUE_EVENT_KEY) {
         SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
-        QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues);
+        QPID_LOG(trace, *this << "Received dequeues: " << dequeues);
         //TODO: should be able to optimise the following
         for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
             dequeue(*i, l);
     } else if (key == POSITION_EVENT_KEY) {
         SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+        QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition()
+                 << " to " << position);
         assert(queue->getPosition() <= position);
          //TODO aconway 2011-12-14: Optimize this?
         for (SequenceNumber i = queue->getPosition(); i < position; ++i)
             dequeue(i,l);
         queue->setPosition(position);
-        QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), queue->getPosition()));
     } else {
-        QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), queue->getPosition()+1));
         msg.deliverTo(queue);
+        QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition());
     }
 }
 
+// Unused Exchange methods.
 bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
 bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
 bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Wed Dec 21 22:34:50 2011
@@ -23,6 +23,7 @@
  */
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/SequenceSet.h"
+#include <iosfwd>
 
 namespace qpid {
 
@@ -44,16 +45,18 @@ namespace ha {
  * Creates a ReplicatingSubscription on the primary by passing special
  * arguments to the consume command.
  *
- * THREAD SAFE: Called in arbitrary connection threads.
+ * THREAD UNSAFE: Only called in the connection thread of the source queue.
  */
 class QueueReplicator : public broker::Exchange
 {
   public:
     static const std::string DEQUEUE_EVENT_KEY;
     static const std::string POSITION_EVENT_KEY;
+    static std::string replicatorName(const std::string& queueName);
 
     QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
     ~QueueReplicator();
+
     std::string getType() const;
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
@@ -67,6 +70,8 @@ class QueueReplicator : public broker::E
     sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;
+
+  friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
 };
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed Dec 21 22:34:50 2011
@@ -20,7 +20,6 @@
  */
 
 #include "ReplicatingSubscription.h"
-#include "Logging.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/ConnectionState.h"
@@ -43,6 +42,13 @@ const string DOLLAR("$");
 const string INTERNAL("-internal");
 } // namespace
 
+
+ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
+    string url = rs.parent->getSession().getConnection().getUrl();
+    string qname= rs.getQueue()->getName();
+    return o << "HA: Primary: " << qname << "(" <<  url << "):";
+}
+
 string mask(const string& in)
 {
     return DOLLAR + in + INTERNAL;
@@ -96,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubs
     // can be re-introduced later. Last revision with the optimization:
     // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
 
-    QPID_LOG(debug, "HA: Started " << *this << " subscription " << name);
+    QPID_LOG(debug, *this << "Created subscription " << name);
 
     // Note that broker::Queue::getPosition() returns the sequence
     // number that will be assigned to the next message *minus 1*.
@@ -125,36 +131,39 @@ bool ReplicatingSubscription::deliver(Qu
              if (position - backupPosition > 1) {
                  // Position has advanced because of messages dequeued ahead of us.
                  SequenceNumber send(position);
-                 // Send the position before m was enqueued.
-                 sendPositionEvent(--send, l); 
+                 --send;   // Send the position before m was enqueued.
+                 sendPositionEvent(send, l); 
+                 QPID_LOG(trace, *this << "Sending position " << send
+                          << ", was " << backupPosition);
              }
              backupPosition = position;
         }
-        QPID_LOG(trace, "HA: Replicating " << QueuePos(m) << " to " << *this);
+        QPID_LOG(trace, *this << "Replicating message " << m.position);
     }
     return ConsumerImpl::deliver(m);
 }
 
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+// Called in the subscription's connection thread.
 void ReplicatingSubscription::cancel()
 {
-    QPID_LOG(debug, "HA: Cancelled " << *this);
+    QPID_LOG(debug, *this <<"Cancelled");
     getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
 }
 
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
 void ReplicatingSubscription::enqueued(const QueuedMessage& m)
 {
     //delay completion
     m.payload->getIngressCompletion().startCompleter();
 }
 
-// Called with lock held.
+// Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
 {
-    QPID_LOG(trace, "HA: Sending dequeues " << dequeues << " to " << *this);
+    QPID_LOG(trace, *this << "Sending dequeues " << dequeues);
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
@@ -163,12 +172,10 @@ void ReplicatingSubscription::sendDequeu
     sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
 }
 
-// Called with lock held.
+// Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendPositionEvent(
     SequenceNumber position, const sys::Mutex::ScopedLock&l )
 {
-    QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), position)
-             << " on " << *this);
     string buf(backupPosition.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     position.encode(buffer);
@@ -209,21 +216,24 @@ void ReplicatingSubscription::sendEvent(
 }
 
 // Called after the message has been removed from the deque and under
-// the message lock in the queue.
+// the message lock in the queue. Called in arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)
 {
+    QPID_LOG(trace, *this << "Dequeued message " << m.position);
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
-        QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << *this);
     }
     notify();                   // Ensure a call to doDispatch
+    // FIXME aconway 2011-12-20: not thread safe to access position here,
+    // we're not in the dispatch thread.
     if (m.position > position) {
         m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early on " << *this);
+        QPID_LOG(trace, *this << "Completed message " << m.position << " early");
     }
 }
 
+// Called in subscription's connection thread.
 bool ReplicatingSubscription::doDispatch()
 {
     {
@@ -235,19 +245,10 @@ bool ReplicatingSubscription::doDispatch
 
 ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
 ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
-    return delegate.deliver(m);
-}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
 void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
 bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
 bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
 OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
 
-
-ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
-    string url = rs.parent->getSession().getConnection().getUrl();
-    return o << rs.getQueue()->getName() << " backup on " << url;
-}
-
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Wed Dec 21 22:34:50 2011
@@ -49,7 +49,7 @@ namespace ha {
  * Runs on the primary. Delays completion of messages till the backup
  * has acknowledged, informs backup of locally dequeued messages.
  *
- * THREAD SAFE: Used as a consume in subscription's connection
+ * THREAD SAFE: Used as a consumer in subscription's connection
  * thread, and as a QueueObserver in arbitrary connection threads.
  */
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Wed Dec 21 22:34:50 2011
@@ -229,8 +229,6 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& map = i->asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                QPID_LOG(trace, "HA: Backup received event: schema=" << schema
-                         << " values=" << values);
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -244,16 +242,15 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-                QPID_LOG(trace, "HA: Backup received response type=" << type
-                         << " values=" << values);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
+                else QPID_LOG(error, "HA: Backup received unknown response: type=" << type
+                              << " values=" << values);
+
                 // FIXME aconway 2011-12-06: handle all relevant response types.
             }
-        } else {
-            QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers);
-        }
+        } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
     } catch (const std::exception& e) {
         QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
     }
@@ -280,11 +277,11 @@ void WiringReplicator::doEventQueueDecla
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
-            QPID_LOG(debug, "HA: Created backup queue from event: " << name);
+            QPID_LOG(debug, "HA: Backup created queue: " << name);
             startQueueReplicator(result.first);
         } else {
             // FIXME aconway 2011-12-02: what's the right way to handle this?
-            QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
+            QPID_LOG(warning, "HA: Backup queue already exists: " << name);
         }
     }
 }
@@ -293,11 +290,14 @@ void WiringReplicator::doEventQueueDelet
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "HA: Deleting backup queue from event: " << name);
+        QPID_LOG(debug, "HA: Backup deleting queue: " << name);
         broker.deleteQueue(
             name,
             values[USER].asString(),
             values[RHOST].asString());
+        // FIXME aconway 2011-12-21: casuses race conditions? Restore.
+//         // Also delete the QueueReplicator exchange for this queue.
+//         broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
     }
 }
 
@@ -316,11 +316,11 @@ void WiringReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString()).second)
         {
-                    QPID_LOG(debug, "HA: created backup exchange from event: " << name);
+                    QPID_LOG(debug, "HA: Backup created exchange: " << name);
         } else {
             // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "HA: Exchange already exists on backup: " << name);
+            QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
         }
     }
 }
@@ -330,7 +330,7 @@ void WiringReplicator::doEventExchangeDe
     try {
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
         if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "HA: Deleting backup exchange:" << name);
+            QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
             broker.deleteExchange(
                 name,
                 values[USER].asString(),
@@ -352,7 +352,7 @@ void WiringReplicator::doEventBind(Varia
         framing::FieldTable args;
         amqp_0_10::translate(values[ARGS].asMap(), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+        QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
         exchange->bind(queue, key, &args);
@@ -377,12 +377,12 @@ void WiringReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
-        QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]);
+        QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
+        QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
     }
 }
 
@@ -400,9 +400,9 @@ void WiringReplicator::doResponseExchang
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second)
     {
-        QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]);
+        QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
     } else {
-        QPID_LOG(warning, "HA: Exchange already exists on backup:  " << values[QNAME]);
+        QPID_LOG(warning, "HA: Backup catch-up exchange already exists:  " << values[QNAME]);
     }
 }
 
@@ -442,7 +442,7 @@ void WiringReplicator::doResponseBind(Va
         amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName()
+        QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1221920&r1=1221919&r2=1221920&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Wed Dec 21 22:34:50 2011
@@ -108,6 +108,7 @@ class ShortTests(BrokerTest):
 
         # Test a series of messages, enqueue all then dequeue all.
         s = p.sender(queue("foo","all"))
+        self.wait(b, "foo")
         msgs = [str(i) for i in range(10)]
         for m in msgs: s.send(Message(m))
         self.assert_browse_retry(p, "foo", msgs)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org