You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/09 19:08:40 UTC

svn commit: r1521192 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/

Author: aconway
Date: Mon Sep  9 17:08:39 2013
New Revision: 1521192

URL: http://svn.apache.org/r1521192
Log:
QPID-4327: HA support for TX transactions - fix TX error messages.

- Ignore un-replicated queues when replicating transactions.
- Clean up cancel logic in QueueReplicator, causing "no such subscription" errors.
- Remove unnecessary exchange delete warnings
- ha_test.py: Shorter timeout for starting cluster brokers.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Sep  9 17:08:39 2013
@@ -560,11 +560,7 @@ void BrokerReplicator::doEventExchangeDe
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
     string name = values[EXNAME].asString();
     boost::shared_ptr<Exchange> exchange = exchanges.find(name);
-    if (!exchange) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
-    } else if (!replicationTest.getLevel(*exchange)) {
-        QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
-    } else {
+    if (exchange && replicationTest.getLevel(*exchange)) {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
         if (exchangeTracker.get()) exchangeTracker->event(name);
         deleteExchange(name);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Sep  9 17:08:39 2013
@@ -203,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBroke
 }
 
 void HaBroker::shutdown(const std::string& message) {
-    QPID_LOG(critical, message);
+    QPID_LOG(critical, "Shutting down: " << message);
     broker.shutdown();
     throw Exception(message);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Mon Sep  9 17:08:39 2013
@@ -79,7 +79,9 @@ class PrimaryTxObserver::Exchange : publ
 const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
 
 PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
-    haBroker(hb), broker(hb.getBroker()), id(true),
+    haBroker(hb), broker(hb.getBroker()),
+    replicationTest(hb.getSettings().replicateDefault.get()),
+    id(true),
     exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
     failed(false), ended(false)
 {
@@ -98,8 +100,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
 
     pair<QueuePtr, bool> result =
         broker.getQueues().declare(
-            TRANSACTION_REPLICATOR_PREFIX+id.str(),
-            QueueSettings(/*durable*/false, /*autodelete*/true));
+            exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true));
     assert(result.second);
     txQueue = result.first;
     txQueue->deliver(TxMembersEvent(members).message());
@@ -109,25 +110,35 @@ PrimaryTxObserver::~PrimaryTxObserver() 
 
 
 void PrimaryTxObserver::initialize() {
-    broker.getExchanges().registerExchange(
-        boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
+    boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
+    FieldTable args = ex->getArgs();
+    args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
+    broker.getExchanges().registerExchange(ex);
 }
 
 void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
 {
     sys::Mutex::ScopedLock l(lock);
-    QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
-    enqueues[q] += m.getReplicationId();
-    txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
-    txQueue->deliver(m);
+    if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+        QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+        enqueues[q] += m.getReplicationId();
+        txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
+        txQueue->deliver(m);
+    }
 }
 
 void PrimaryTxObserver::dequeue(
     const QueuePtr& q, QueuePosition pos, ReplicationId id)
 {
     sys::Mutex::ScopedLock l(lock);
-    QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
-    txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+    if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+        QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+        txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+    }
+    else {
+        QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
+                 << LogMessageId(*q, pos, id));
+    }
 }
 
 void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Mon Sep  9 17:08:39 2013
@@ -23,7 +23,7 @@
  */
 
 #include "types.h"
-
+#include "ReplicationTest.h"
 #include "qpid/broker/TransactionObserver.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Uuid.h"
@@ -97,6 +97,7 @@ class PrimaryTxObserver : public broker:
     std::string logPrefix;
     HaBroker& haBroker;
     broker::Broker& broker;
+    ReplicationTest replicationTest;
 
     types::Uuid id;
     std::string exchangeName;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Sep  9 17:08:39 2013
@@ -112,7 +112,6 @@ QueueReplicator::QueueReplicator(HaBroke
       logPrefix("Backup of "+q->getName()+": "),
       subscribed(false),
       settings(hb.getSettings()),
-      destroyed(false),
       nextId(0), maxId(0)
 {
     args.setString(QPID_REPLICATE, printable(NONE).str());
@@ -181,8 +180,7 @@ void QueueReplicator::destroy() {
     boost::shared_ptr<Bridge> bridge2; // To call outside of lock
     {
         Mutex::ScopedLock l(lock);
-        if (destroyed) return;
-        destroyed = true;
+        if (!queue) return;     // Already destroyed
         QPID_LOG(debug, logPrefix << "Destroyed");
         bridge2 = bridge;       // call close outside the lock.
         // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
@@ -197,7 +195,7 @@ void QueueReplicator::destroy() {
 // Note: called with the Link lock held.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
     Mutex::ScopedLock l(lock);
-    if (destroyed) return;         // Already destroyed
+    if (!queue) return;         // Already destroyed
     sessionHandler = &sessionHandler_;
     AMQP_ServerProxy peer(sessionHandler->out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -225,14 +223,6 @@ void QueueReplicator::initializeBridge(B
     QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
 }
 
-void QueueReplicator::cancel(Mutex::ScopedLock&) {
-    if (sessionHandler) {
-        // Cancel the replicating subscription.
-        AMQP_ServerProxy peer(sessionHandler->out);
-        peer.getMessage().cancel(getName());
-    }
-}
-
 namespace {
 template <class T> T decodeContent(Message& m) {
     std::string content = m.getContent();
@@ -259,7 +249,7 @@ void QueueReplicator::route(Deliverable&
 {
     try {
         Mutex::ScopedLock l(lock);
-        if (destroyed) return;
+        if (!queue) return;     // Already destroyed
         broker::Message& message(deliverable.getMessage());
         string key(message.getRoutingKey());
         if (!isEventKey(message.getRoutingKey())) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon Sep  9 17:08:39 2013
@@ -94,7 +94,6 @@ class QueueReplicator : public broker::E
 
     virtual void deliver(const broker::Message&);
     virtual void destroy();             // Called when the queue is destroyed.
-    void cancel(sys::Mutex::ScopedLock&);
 
     sys::Mutex lock;
     HaBroker& haBroker;
@@ -122,7 +121,6 @@ class QueueReplicator : public broker::E
 
     bool subscribed;
     const Settings& settings;
-    bool destroyed;
     PositionMap positions;
     ReplicationIdSet idSet; // Set of replicationIds on the queue.
     ReplicationId nextId;   // ID for next message to arrive.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Sep  9 17:08:39 2013
@@ -41,6 +41,13 @@ namespace ha {
 /**
  * Test whether something is replicated, taking into account the
  * default replication level.
+ *
+ * The primary uses a ReplicationTest with default based on configuration
+ * settings, and marks objects to be replicated with an explict replication
+ * argument.
+ *
+ * The backup uses a default of NONE, so it always accepts what the primary has
+ * marked on the object.
  */
 class ReplicationTest
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Mon Sep  9 17:08:39 2013
@@ -215,21 +215,24 @@ void TxReplicator::rollback(const string
     end(l);
 }
 
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
     TxMembersEvent e;
     decodeStr(data, e);
     QPID_LOG(debug, logPrefix << "Members: " << e.members);
     if (!e.members.count(haBroker.getMembership().getSelf())) {
         QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
-        // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
-        haBroker.getBroker().deleteQueue(
-            getQueue()->getName(), haBroker.getUserId(), string());
+        end(l);
     }
 }
 
-void TxReplicator::end(sys::Mutex::ScopedLock& l) {
+void TxReplicator::end(sys::Mutex::ScopedLock&) {
     complete = true;
-    cancel(l);
+    if (!getQueue()) return;    // Already destroyed
+    // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+    // Need to do this now to cancel the subscription to the primary tx-queue
+    // which informs the primary that we have completed the transaction.
+    haBroker.getBroker().deleteQueue(
+        getQueue()->getName(), haBroker.getUserId(), string());
 }
 
 void TxReplicator::destroy() {

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Sep  9 17:08:39 2013
@@ -125,7 +125,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
+                 "--log-enable=debug+:ha::",
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont
@@ -323,7 +323,7 @@ class HaCluster(object):
         ha_port = self._ports[i]
         b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
                      args=args, **self.kwargs)
-        b.ready()
+        b.ready(timeout=5)
         return b
 
     def start(self):

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Sep  9 17:08:39 2013
@@ -562,8 +562,7 @@ class ReplicationTests(HaBrokerTest):
             return
         acl=os.path.join(os.getcwd(), "policy.acl")
         aclf=file(acl,"w")
-        # Verify that replication works with auth=yes and HA user has at least the following
-        # privileges:
+        # Minimum set of privileges required for the HA user.
         aclf.write("""
 # HA user
 acl allow zag@QPID access queue
@@ -592,14 +591,14 @@ acl deny all all
             client_credentials=Credentials("zag", "zag", "PLAIN"))
         c = cluster[0].connect(username="zig", password="zig")
         s0 = c.session();
-        s0.receiver("q;{create:always}")
-        s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+        s0.sender("q;{create:always}")
+        s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
         s0.sender("ex").send("foo");
         s1 = c.session(transactional=True)
-        s1.sender("ex").send("tx");
+        s1.sender("ex").send("foo-tx");
         cluster[1].assert_browse_backup("q", ["foo"])
         s1.commit()
-        cluster[1].assert_browse_backup("q", ["foo", "tx"])
+        cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
 
     def test_alternate_exchange(self):
         """Verify that alternate-exchange on exchanges and queues is propagated



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org