You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/08 11:24:16 UTC

svn commit: r1616704 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Fri Aug  8 09:24:15 2014
New Revision: 1616704

URL: http://svn.apache.org/r1616704
Log:
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.

There were several problems:

1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued
   on backup if dequeued outside a TX on primary.

2. Race condition if tx created immediately after queue could cause duplication of TX message.

3. Replication IDs were not being set during recovery from store (regression, store change?)

Fix:

1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues.
2. Check for duplicate replication-ids on backup in QueueReplicator::route.
3. Set replication-id in publish() if not already set in record().

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/types.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Aug  8 09:24:15 2014
@@ -29,6 +29,7 @@
 #include "qpid/management/Manageable.h"
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
+#include "qpid/assert.h"
 
 #include <algorithm>
 #include <string.h>
@@ -46,11 +47,11 @@ using std::string;
 namespace qpid {
 namespace broker {
 
-Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0)
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
 {}
 
 Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
-    : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0)
+    : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
 {
     if (persistentContext) persistentContext->setIngressCompletion(e);
 }
@@ -297,9 +298,18 @@ void Message::processProperties(MapHandl
     sharedState->processProperties(handler);
 }
 
-uint64_t Message::getReplicationId() const { return replicationId; }
+bool Message::hasReplicationId() const {
+    return isReplicationIdSet;
+}
+
+uint64_t Message::getReplicationId() const {
+    return replicationId;
+}
 
-void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+void Message::setReplicationId(framing::SequenceNumber id) {
+    replicationId = id;
+    isReplicationIdSet = true;
+}
 
 sys::AbsTime Message::getExpiration() const
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Aug  8 09:24:15 2014
@@ -168,6 +168,7 @@ public:
 
     QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
     QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+    QPID_BROKER_EXTERN bool hasReplicationId() const;
     QPID_BROKER_EXTERN uint64_t getReplicationId() const;
     QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
 
@@ -214,6 +215,7 @@ public:
     MessageState state;
     qpid::framing::SequenceNumber sequence;
     framing::SequenceNumber replicationId;
+    bool isReplicationIdSet:1;
 
     void annotationsChanged();
     bool getTtl(uint64_t&, uint64_t expiredValue) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Aug  8 09:24:15 2014
@@ -71,7 +71,7 @@ class HaBroker::BrokerObserver : public 
   public:
     void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
         q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
-        q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+        q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
     }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h Fri Aug  8 09:24:15 2014
@@ -43,10 +43,31 @@ namespace ha {
 class IdSetter : public broker::MessageInterceptor
 {
   public:
-    IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
-    void record(broker::Message& m) { m.setReplicationId(nextId++); }
+    IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) {
+        QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId);
+    }
+
+    void record(broker::Message& m) {
+        // Record is called when a message is first delivered to a queue, before it has
+        // been enqueued or saved in a transaction buffer. This is when we normally want
+        // to assign a replication-id.
+        m.setReplicationId(nextId++);
+        QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
+    }
+
+    void publish(broker::Message& m) {
+        // Publish is called when a message is assigned a position on the queue,
+        // after any transaction has comitted. Normally this is too late to
+        // assign a replication-id but during broker start-up and recovery from
+        // store record() is not called, so set the ID now if not already set.
+        if (!m.hasReplicationId()) {
+            m.setReplicationId(nextId++);
+            QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m));
+        }
+    }
 
   private:
+    std::string queue;
     sys::AtomicValue<uint32_t> nextId;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Fri Aug  8 09:24:15 2014
@@ -148,7 +148,7 @@ void PrimaryTxObserver::enqueue(const Qu
 {
     Mutex::ScopedLock l(lock);
     if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
-        QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+        QPID_LOG(trace, logPrefix << "Enqueue: " << logMessageId(*q, m.getReplicationId()));
         checkState(SENDING, "Too late for enqueue");
         empty = false;
         enqueues[q] += m.getReplicationId();
@@ -163,7 +163,7 @@ void PrimaryTxObserver::dequeue(
     Mutex::ScopedLock l(lock);
     checkState(SENDING, "Too late for dequeue");
     if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
-        QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+        QPID_LOG(trace, logPrefix << "Dequeue: " << logMessageId(*q, pos, id));
         empty = false;
         dequeues[q] += id;
         txQueue->deliver(TxDequeueEvent(q->getName(), id).message());

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Aug  8 09:24:15 2014
@@ -72,7 +72,7 @@ void QueueGuard::enqueued(const Message&
     ReplicationId id = m.getReplicationId();
     Mutex::ScopedLock l(lock);
     if (cancelled) return;  // Don't record enqueues after we are cancelled.
-    QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
+    QPID_LOG(trace, logPrefix << "Delayed completion of " << logMessageId(queue, m));
     delayed[id] = m.getIngressCompletion();
     m.getIngressCompletion()->startCompleter();
 }
@@ -80,7 +80,7 @@ void QueueGuard::enqueued(const Message&
 // NOTE: Called with message lock held.
 void QueueGuard::dequeued(const Message& m) {
     ReplicationId id = m.getReplicationId();
-    QPID_LOG(trace, logPrefix << "Dequeued "  << LogMessageId(queue, m));
+    QPID_LOG(trace, logPrefix << "Dequeued "  << logMessageId(queue, m));
     Mutex::ScopedLock l(lock);
     complete(id, l);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Aug  8 09:24:15 2014
@@ -106,22 +106,34 @@ class QueueReplicator::ErrorListener : p
 
 class QueueReplicator::QueueObserver : public broker::QueueObserver {
   public:
-    QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {}
-    void enqueued(const Message&) {}
-    void dequeued(const Message&) {}
+    typedef boost::shared_ptr<QueueReplicator> Ptr;
+    QueueObserver(Ptr qr) : queueReplicator(qr) {}
+
+    void enqueued(const Message& m) {
+        Ptr qr = queueReplicator.lock();
+        if (qr) qr->enqueued(m);
+    }
+
+    void dequeued(const Message& m) {
+        Ptr qr = queueReplicator.lock();
+        if (qr) qr->dequeued(m);
+    }
+
     void acquired(const Message&) {}
     void requeued(const Message&) {}
     void consumerAdded( const Consumer& ) {}
     void consumerRemoved( const Consumer& ) {}
     // Queue observer is destroyed when the queue is.
     void destroy() {
-        boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+        Ptr qr = queueReplicator.lock();
         if (qr) qr->destroy();
     }
+
   private:
     boost::weak_ptr<QueueReplicator> queueReplicator;
 };
 
+
 boost::shared_ptr<QueueReplicator> QueueReplicator::create(
     HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l)
 {
@@ -278,48 +290,73 @@ void QueueReplicator::dequeueEvent(const
     QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
     //TODO: should be able to optimise the following
     for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
-        PositionMap::iterator j = positions.find(*i);
-        if (j != positions.end()) queue->dequeueMessageAt(j->second);
+        QueuePosition position;
+        {
+            Mutex::ScopedLock l(lock);
+            PositionMap::iterator j = positions.find(*i);
+            if (j == positions.end()) continue;
+            position = j->second;
+        }
+        queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
+        // positions will be cleaned up in dequeued()
     }
 }
 
 // Called in connection thread of the queues bridge to primary.
-
 void QueueReplicator::route(Deliverable& deliverable)
 {
     try {
-        Mutex::ScopedLock l(lock);
-        if (!queue) return;     // Already destroyed
         broker::Message& message(deliverable.getMessage());
-        string key(message.getRoutingKey());
-        if (!isEventKey(message.getRoutingKey())) {
+        {
+            Mutex::ScopedLock l(lock);
+            if (!queue) return;     // Already destroyed
+            string key(message.getRoutingKey());
+            if (isEventKey(key)) {
+                DispatchMap::iterator i = dispatch.find(key);
+                if (i == dispatch.end()) {
+                    QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+                } else {
+                    (i->second)(message.getContent(), l);
+                }
+                return;
+            }
             ReplicationId id = nextId++;
-            maxId = std::max(maxId, id);
             message.setReplicationId(id);
-            deliver(message);
-            QueuePosition position = queue->getPosition();
-            positions[id] = position;
-            QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
-        }
-        else {
-            DispatchMap::iterator i = dispatch.find(key);
-            if (i == dispatch.end()) {
-                QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
-            }
-            else {
-                (i->second)(message.getContent(), l);
+            PositionMap::iterator i = positions.find(id);
+            if (i != positions.end()) {
+                QPID_LOG(trace, logPrefix << "Already on queue: " << logMessageId(*queue, message));
+                return;
             }
+            QPID_LOG(trace, logPrefix << "Received: " << logMessageId(*queue, message));
         }
+        deliver(message);       // Outside lock, will call enqueued()
     }
     catch (const std::exception& e) {
         haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
     }
+
 }
 
 void QueueReplicator::deliver(const broker::Message& m) {
     queue->deliver(m);
 }
 
+// Called via QueueObserver when message is enqueued. Could be as part of deliver()
+// or in a different thread if a message is enqueued via a transaction.
+//
+void QueueReplicator::enqueued(const broker::Message& m) {
+    Mutex::ScopedLock l(lock);
+    maxId = std::max(maxId, ReplicationId(m.getReplicationId()));
+    positions[m.getReplicationId()] = m.getSequence();
+    QPID_LOG(trace, logPrefix << "Enqueued " << logMessageId(*queue, m));
+}
+
+// Called via QueueObserver
+void QueueReplicator::dequeued(const broker::Message& m) {
+    Mutex::ScopedLock l(lock);
+    positions.erase(m.getReplicationId());
+}
+
 void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
     nextId = decodeStr<IdEvent>(data).id;
 }
@@ -349,8 +386,9 @@ std::string QueueReplicator::getType() c
 void QueueReplicator::promoted() {
     if (queue) {
         // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+        QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
         queue->getMessageInterceptors().add(
-            boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+            boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1)));
         // Process auto-deletes
         if (queue->isAutoDelete()) {
             // Make a temporary shared_ptr to prevent premature deletion of queue.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Aug  8 09:24:15 2014
@@ -80,6 +80,10 @@ class QueueReplicator : public broker::E
 
     void route(broker::Deliverable&);
 
+    // Called via QueueObserver
+    void enqueued(const broker::Message&);
+    void dequeued(const broker::Message&);
+
     // Set if the queue has ever been subscribed to, used for auto-delete cleanup.
     void setSubscribed() { subscribed = true; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug  8 09:24:15 2014
@@ -216,14 +216,14 @@ bool ReplicatingSubscription::deliver(
     try {
         bool result = false;
         if (skipEnqueue.contains(id)) {
-            QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
+            QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m));
             skipEnqueue -= id;
             guard->complete(id); // This will never be acknowledged.
             notify();
             result = true;
         }
         else {
-            QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+            QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m));
             if (!ready && !isGuarded(l)) unready += id;
             sendIdEvent(id, l);
             result = ConsumerImpl::deliver(c, m);
@@ -231,7 +231,7 @@ bool ReplicatingSubscription::deliver(
         checkReady(l);
         return result;
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
+        QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m)
                  << ": " << e.what());
         throw;
     }
@@ -268,7 +268,7 @@ void ReplicatingSubscription::acknowledg
     // Finish completion of message, it has been acknowledged by the backup.
     ReplicationId id = r.getReplicationId();
     QPID_LOG(trace, logPrefix << "Acknowledged " <<
-             LogMessageId(*getQueue(), r.getMessageId(), id));
+             logMessageId(*getQueue(), r.getMessageId(), id));
     guard->complete(id);
     {
         Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug  8 09:24:15 2014
@@ -137,8 +137,8 @@ class ReplicatingSubscription :
 
     BrokerInfo getBrokerInfo() const { return info; }
 
-void skipEnqueues(const ReplicationIdSet& ids);
-void skipDequeues(const ReplicationIdSet& ids);
+    void skipEnqueues(const ReplicationIdSet& ids);
+    void skipDequeues(const ReplicationIdSet& ids);
 
   protected:
     bool doDispatch();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Fri Aug  8 09:24:15 2014
@@ -127,20 +127,19 @@ void TxReplicator::sendMessage(const bro
     }
 }
 
-void TxReplicator::route(broker::Deliverable& deliverable) {
-    QueueReplicator::route(deliverable);
-}
-
 void TxReplicator::deliver(const broker::Message& m_) {
-    sys::Mutex::ScopedLock l(lock);
-    if (!txBuffer) return;
-    // Deliver message to the target queue, not the tx-queue.
+    boost::intrusive_ptr<broker::TxBuffer> txbuf;
     broker::Message m(m_);
-    m.setReplicationId(enq.id); // Use replicated id.
-    boost::shared_ptr<broker::Queue> queue =
-        haBroker.getBroker().getQueues().get(enq.queue);
-    QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
-    DeliverableMessage dm(m, txBuffer.get());
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (!txBuffer) return;
+        txbuf = txBuffer;
+        m.setReplicationId(enq.id); // Use enqueued replicated id.
+    }
+    // Deliver message to the target queue, not the tx-queue.
+    boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue);
+    QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId()));
+    DeliverableMessage dm(m, txbuf.get());
     dm.deliverTo(queue);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Fri Aug  8 09:24:15 2014
@@ -66,7 +66,6 @@ class TxReplicator : public QueueReplica
     std::string getType() const;
 
     // QueueReplicator overrides
-    void route(broker::Deliverable& deliverable);
     using QueueReplicator::destroy;
     void destroy(sys::Mutex::ScopedLock&);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Fri Aug  8 09:24:15 2014
@@ -95,17 +95,24 @@ ostream& operator<<(ostream& o, const Uu
     return o;
 }
 
-LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) :
-    queue(q.getName()), position(pos), replicationId(id) {}
 
-LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) :
-    queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {}
-
-LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) :
-    queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {}
-
-std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
-    return o  << m.queue << "[" << m.position << "]=" << m.replicationId;
+std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id) {
+    return Msg() << q << "[" << pos << "]" << "=" << id;
+}
+std::string logMessageId(const std::string& q, ReplicationId id) {
+    return Msg() << q << "[]" << "=" << id;
+}
+std::string logMessageId(const std::string& q, const broker::Message& m)  {
+    return logMessageId(q, m.getSequence(), m.getReplicationId());
+}
+std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) {
+    return logMessageId(q.getName(), pos, id);
+}
+std::string logMessageId(const broker::Queue& q, ReplicationId id) {
+    return logMessageId(q.getName(), id);
+}
+std::string logMessageId(const broker::Queue& q, const broker::Message& m) {
+    return logMessageId(q.getName(), m);
 }
 
 void UuidSet::encode(framing::Buffer& b) const {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.h?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.h Fri Aug  8 09:24:15 2014
@@ -132,17 +132,13 @@ typedef framing::SequenceNumber Replicat
 typedef framing::SequenceSet QueuePositionSet;
 typedef framing::SequenceSet ReplicationIdSet;
 
-/** Helper for logging message ID  */
-struct LogMessageId {
-    typedef boost::shared_ptr<broker::Queue> QueuePtr;
-    LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
-    LogMessageId(const broker::Queue& q, const broker::Message& m);
-    LogMessageId(const std::string& q, const broker::Message& m);
-    const std::string& queue;
-    QueuePosition position;
-    ReplicationId replicationId;
-};
-std::ostream& operator<<(std::ostream&, const LogMessageId&);
+/** Helpers for logging message ID  */
+std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id);
+std::string logMessageId(const std::string& q, ReplicationId id);
+std::string logMessageId(const std::string& q, const broker::Message& m);
+std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
+std::string logMessageId(const broker::Queue& q, ReplicationId id);
+std::string logMessageId(const broker::Queue& q, const broker::Message& m);
 
 /** Return short version of human-readable UUID. */
 inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); }

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Aug  8 09:24:15 2014
@@ -428,17 +428,22 @@ class Broker(Popen):
                 assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
         finally: log.close()
 
+def receiver_iter(receiver, timeout=0):
+    """Make an iterator out of a receiver. Returns messages till Empty is raised."""
+    try:
+        while True:
+            yield receiver.fetch(timeout=timeout)
+    except qm.Empty:
+        pass
+
 def browse(session, queue, timeout=0, transform=lambda m: m.content):
     """Return a list with the contents of each message on queue."""
     r = session.receiver("%s;{mode:browse}"%(queue))
     r.capacity = 100
     try:
-        contents = []
-        try:
-            while True: contents.append(transform(r.fetch(timeout=timeout)))
-        except qm.Empty: pass
-    finally: r.close()
-    return contents
+        return [transform(m) for m in receiver_iter(r, timeout)]
+    finally:
+        r.close()
 
 def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
     """Assert that the contents of messages on queue (as retrieved

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Aug  8 09:24:15 2014
@@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest):
 
     def test_stalled_backup(self):
         """Make sure that a stalled backup broker does not stall the primary"""
-        # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
         cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
         os.kill(cluster[1].pid, signal.SIGSTOP)
         s = cluster[0].connect().session()
@@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest):
         """Verify that a backup erases queue data from store recovery before
         doing catch-up from the primary."""
         if self.check_skip(): return
-        cluster = HaCluster(self, 2)
+        cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
         sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
         s1 = sn.sender("q1;{create:always,node:{durable:true}}")
         for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest):
         tx.acknowledge()
         tx.commit()
         tx.sync()
+        tx.close()
+
+        for b in cluster:
+            self.assert_simple_commit_outcome(b, tx_queues)
+
+        # Verify non-tx dequeue is replicated correctly
+        c = cluster.connect(0, protocol=self.tx_protocol)
+        r = c.session().receiver("b")
+        ri = receiver_iter(r, timeout=1)
+        self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
+        r.session.acknowledge()
+        for b in cluster: b.assert_browse_backup("b", [], msg=b)
 
+    def check_enq_deq(self, cluster, queue, expect):
+        for b in cluster:
+            q = b.agent.getQueue(queue)
+            self.assertEqual(
+                (b.name,)+expect,
+                (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
+
+    def test_tx_enq_notx_deq(self):
+        """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+        cluster = HaCluster(self, 2, test_store=True)
+        c = cluster.connect(0, protocol=self.tx_protocol)
+
+        tx = c.session(transactional=True)
+        c.session().sender("qq;{create:always}").send("m1")
+        tx.sender("qq;{create:always}").send("tx")
+        tx.commit()
         tx.close()
-        for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+        c.session().sender("qq;{create:always}").send("m2")
+        self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
+
+        notx = c.session()
+        self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
+        notx.acknowledge()
+        self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
+        for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+        for b in cluster: self.assert_tx_clean(b)
+
+    def test_tx_enq_notx_deq_qpid_send(self):
+        """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+        cluster = HaCluster(self, 2, test_store=True)
+
+        self.popen(
+            ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
+             '--content-string=foo']
+        ).assert_exit_ok()
+        for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
+        self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
+
+        self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
+        self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
+        for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+        for b in cluster: self.assert_tx_clean(b)
 
     def assert_tx_clean(self, b):
         """Verify that there are no transaction artifacts

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1616704&r1=1616703&r2=1616704&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Fri Aug  8 09:24:15 2014
@@ -205,7 +205,7 @@ struct Transfer : public TransactionalCl
                     }
                     session.commit();
                     t++;
-                    if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+                    if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
                 } catch (const TransactionAborted&) {
                     std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
                     session = connection.createTransactionalSession();
@@ -246,6 +246,16 @@ struct Controller : public Client
 
         for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
             std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE);
+
+            // Clear out any garbage on queues.
+            Receiver receiver = session.createReceiver(address);
+            Message rmsg;
+            uint count(0);
+            while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count;
+            session.acknowledge();
+            receiver.close();
+            if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl;
+
             Sender sender = session.createSender(address);
             if (i == queues.begin()) {
                 for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org