You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/09 19:08:23 UTC

svn commit: r1521190 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/

Author: aconway
Date: Mon Sep  9 17:08:23 2013
New Revision: 1521190

URL: http://svn.apache.org/r1521190
Log:
QPID-4327: HA support for TX transactions - fix auth bugs.

- Set auth info on status check connections
- Clean up status check loging
- Use realm@username for authentication name (was using just username)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Mon Sep  9 17:08:23 2013
@@ -52,9 +52,7 @@ using sys::Mutex;
 Backup::Backup(HaBroker& hb, const Settings& s) :
     logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
     haBroker(hb), broker(hb.getBroker()), settings(s),
-    statusCheck(
-        new StatusCheck(
-            logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
+    statusCheck(new StatusCheck(hb))
 {}
 
 void Backup::setBrokerUrl(const Url& brokers) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Sep  9 17:08:23 2013
@@ -799,7 +799,7 @@ void BrokerReplicator::deleteQueue(const
         // messages. Any reroutes will be done at the primary and
         // replicated as normal.
         if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
-        haBroker.deleteQueue(name, remoteHost);
+        haBroker.getBroker().deleteQueue(name, userId, remoteHost);
         QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Sep  9 17:08:23 2013
@@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast;
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
+      userId(s.username+"@"+b.getOptions().realm),
       broker(b),
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
@@ -221,8 +222,4 @@ boost::shared_ptr<QueueReplicator> HaBro
         broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
 }
 
-void HaBroker::deleteQueue(const string& name, const string& connectionId) {
-    broker.deleteQueue(name, settings.username, connectionId);
-}
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Sep  9 17:08:23 2013
@@ -102,9 +102,8 @@ class HaBroker : public management::Mana
 
     boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
 
-    /**@param connectionId optional, only available on backup */
-    void deleteQueue(const std::string& name,
-                     const std::string& connectionId=std::string());
+    /** Authenticated user ID for queue create/delete */
+    std::string getUserId() const { return userId; }
 
   private:
     void setPublicUrl(const Url&);
@@ -116,6 +115,7 @@ class HaBroker : public management::Mana
     // Immutable members
     const types::Uuid systemId;
     const Settings settings;
+    const std::string userId;
 
     // Member variables protected by lock
     mutable sys::Mutex lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Mon Sep  9 17:08:23 2013
@@ -169,7 +169,8 @@ void PrimaryTxObserver::rollback() {
 void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
     // Don't destroy the tx-queue if there are connected subscriptions.
     if (ended && unfinished.empty()) {
-        haBroker.deleteQueue(txQueue->getName());
+        haBroker.getBroker().deleteQueue(
+            txQueue->getName(), haBroker.getUserId(), string());
         broker.getExchanges().destroy(getExchangeName());
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Mon Sep  9 17:08:23 2013
@@ -20,6 +20,8 @@
  */
 #include "StatusCheck.h"
 #include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
@@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpi
 
 class StatusCheckThread : public sys::Runnable {
   public:
-    StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
-        : url(addr), statusCheck(sc), brokerInfo(self) {}
+    StatusCheckThread(StatusCheck& sc, const qpid::Address& addr)
+        : url(addr), statusCheck(sc) {}
     void run();
   private:
     Url url;
     StatusCheck& statusCheck;
-    BrokerInfo brokerInfo;
 };
 
 void StatusCheckThread::run() {
-    QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+    string logPrefix("Status check " + url.str() + ": ");
     Connection c;
     try {
+        // Check for self connections
         Variant::Map options, clientProperties;
-        clientProperties = brokerInfo.asMap(); // Detect self connections.
         clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
         clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
-        clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
+        clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
 
+        // Set connection options
+        Settings settings(statusCheck.haBroker.getSettings());
+        if (settings.username.size()) options["username"] = settings.username;
+        if (settings.password.size()) options["password"] = settings.password;
+        if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
         options["client-properties"] = clientProperties;
-        options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+        sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
+        options["heartbeat"] = heartbeat/sys::TIME_SEC;
         c = Connection(url.str(), options);
 
         c.open();
@@ -81,7 +88,7 @@ void StatusCheckThread::run() {
         content["_object_id"] = oid;
         encode(content, request);
         s.send(request);
-        messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC);
+        messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
         Message response = r.fetch(timeout);
         session.acknowledge();
         Variant::List contentIn;
@@ -89,23 +96,22 @@ void StatusCheckThread::run() {
         if (contentIn.size() == 1) {
             Variant::Map details = contentIn.front().asMap()["_values"].asMap();
             string status = details["status"].getString();
+            QPID_LOG(debug, logPrefix << status);
             if (status != "joining") {
                 statusCheck.setPromote(false);
-                QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
-                         << status << ", this broker will refuse promotion.");
+                QPID_LOG(info, logPrefix << "Joining established cluster");
             }
-            QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
         }
     } catch(const exception& error) {
-        QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url
-                 <<  ": " << error.what());
+        // Its not an error to fail to connect to self.
+        if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
+            QPID_LOG(warning, logPrefix << error.what());
     }
     try { c.close(); } catch(...) {}
     delete this;
 }
 
-StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
-    : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
 {}
 
 StatusCheck::~StatusCheck() {
@@ -116,7 +122,7 @@ StatusCheck::~StatusCheck() {
 void StatusCheck::setUrl(const Url& url) {
     Mutex::ScopedLock l(lock);
     for (size_t i = 0; i < url.size(); ++i)
-        threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+        threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
 }
 
 bool StatusCheck::canPromote() {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Mon Sep  9 17:08:23 2013
@@ -23,6 +23,7 @@
  */
 
 #include "BrokerInfo.h"
+#include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
@@ -33,6 +34,8 @@
 namespace qpid {
 namespace ha {
 
+class HaBroker;
+
 // TODO aconway 2012-12-21: This solution is incomplete. It will only protect
 // against bad promotion if there are READY brokers when this broker starts.
 // It will not help the situation where brokers became READY after this one starts.
@@ -51,7 +54,7 @@ namespace ha {
 class StatusCheck
 {
   public:
-    StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
+    StatusCheck(HaBroker&);
     ~StatusCheck();
     void setUrl(const Url&);
     bool canPromote();
@@ -59,12 +62,11 @@ class StatusCheck
   private:
     void setPromote(bool p);
 
-    std::string logPrefix;
     sys::Mutex lock;
     std::vector<sys::Thread> threads;
     bool promote;
-    sys::Duration linkHeartbeatInterval;
-    BrokerInfo brokerInfo;
+    HaBroker& haBroker;
+
   friend class StatusCheckThread;
 };
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Mon Sep  9 17:08:23 2013
@@ -222,7 +222,8 @@ void TxReplicator::members(const string&
     if (!e.members.count(haBroker.getMembership().getSelf())) {
         QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
         // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
-        haBroker.deleteQueue(getQueue()->getName());
+        haBroker.getBroker().deleteQueue(
+            getQueue()->getName(), haBroker.getUserId(), string());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Sep  9 17:08:23 2013
@@ -125,7 +125,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::",
+                 "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Sep  9 17:08:23 2013
@@ -565,6 +565,7 @@ class ReplicationTests(HaBrokerTest):
         # Verify that replication works with auth=yes and HA user has at least the following
         # privileges:
         aclf.write("""
+# HA user
 acl allow zag@QPID access queue
 acl allow zag@QPID create queue
 acl allow zag@QPID consume queue
@@ -576,6 +577,9 @@ acl allow zag@QPID publish exchange
 acl allow zag@QPID delete exchange
 acl allow zag@QPID access method
 acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
 acl deny all all
  """)
         aclf.close()
@@ -586,14 +590,16 @@ acl deny all all
                   "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
                   ],
             client_credentials=Credentials("zag", "zag", "PLAIN"))
-        s0 = cluster[0].connect(username="zag", password="zag").session();
+        c = cluster[0].connect(username="zig", password="zig")
+        s0 = c.session();
         s0.receiver("q;{create:always}")
         s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
-        cluster[1].wait_backup("q")
-        cluster[1].wait_backup("ex")
-        s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
-        s1.sender("ex").send("foo");
-        self.assertEqual(s1.receiver("q").fetch().content, "foo")
+        s0.sender("ex").send("foo");
+        s1 = c.session(transactional=True)
+        s1.sender("ex").send("tx");
+        cluster[1].assert_browse_backup("q", ["foo"])
+        s1.commit()
+        cluster[1].assert_browse_backup("q", ["foo", "tx"])
 
     def test_alternate_exchange(self):
         """Verify that alternate-exchange on exchanges and queues is propagated



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org