You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/09 19:08:23 UTC
svn commit: r1521190 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/
Author: aconway
Date: Mon Sep 9 17:08:23 2013
New Revision: 1521190
URL: http://svn.apache.org/r1521190
Log:
QPID-4327: HA support for TX transactions - fix auth bugs.
- Set auth info on status check connections
- Clean up status check loging
- Use realm@username for authentication name (was using just username)
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Mon Sep 9 17:08:23 2013
@@ -52,9 +52,7 @@ using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
haBroker(hb), broker(hb.getBroker()), settings(s),
- statusCheck(
- new StatusCheck(
- logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
+ statusCheck(new StatusCheck(hb))
{}
void Backup::setBrokerUrl(const Url& brokers) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Sep 9 17:08:23 2013
@@ -799,7 +799,7 @@ void BrokerReplicator::deleteQueue(const
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
- haBroker.deleteQueue(name, remoteHost);
+ haBroker.getBroker().deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Sep 9 17:08:23 2013
@@ -64,6 +64,7 @@ using boost::dynamic_pointer_cast;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ userId(s.username+"@"+b.getOptions().realm),
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
@@ -221,8 +222,4 @@ boost::shared_ptr<QueueReplicator> HaBro
broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
}
-void HaBroker::deleteQueue(const string& name, const string& connectionId) {
- broker.deleteQueue(name, settings.username, connectionId);
-}
-
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Sep 9 17:08:23 2013
@@ -102,9 +102,8 @@ class HaBroker : public management::Mana
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
- /**@param connectionId optional, only available on backup */
- void deleteQueue(const std::string& name,
- const std::string& connectionId=std::string());
+ /** Authenticated user ID for queue create/delete */
+ std::string getUserId() const { return userId; }
private:
void setPublicUrl(const Url&);
@@ -116,6 +115,7 @@ class HaBroker : public management::Mana
// Immutable members
const types::Uuid systemId;
const Settings settings;
+ const std::string userId;
// Member variables protected by lock
mutable sys::Mutex lock;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Mon Sep 9 17:08:23 2013
@@ -169,7 +169,8 @@ void PrimaryTxObserver::rollback() {
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
if (ended && unfinished.empty()) {
- haBroker.deleteQueue(txQueue->getName());
+ haBroker.getBroker().deleteQueue(
+ txQueue->getName(), haBroker.getUserId(), string());
broker.getExchanges().destroy(getExchangeName());
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Mon Sep 9 17:08:23 2013
@@ -20,6 +20,8 @@
*/
#include "StatusCheck.h"
#include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
@@ -41,27 +43,32 @@ const string HA_BROKER = "org.apache.qpi
class StatusCheckThread : public sys::Runnable {
public:
- StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
- : url(addr), statusCheck(sc), brokerInfo(self) {}
+ StatusCheckThread(StatusCheck& sc, const qpid::Address& addr)
+ : url(addr), statusCheck(sc) {}
void run();
private:
Url url;
StatusCheck& statusCheck;
- BrokerInfo brokerInfo;
};
void StatusCheckThread::run() {
- QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+ string logPrefix("Status check " + url.str() + ": ");
Connection c;
try {
+ // Check for self connections
Variant::Map options, clientProperties;
- clientProperties = brokerInfo.asMap(); // Detect self connections.
clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ // Set connection options
+ Settings settings(statusCheck.haBroker.getSettings());
+ if (settings.username.size()) options["username"] = settings.username;
+ if (settings.password.size()) options["password"] = settings.password;
+ if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+ sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
+ options["heartbeat"] = heartbeat/sys::TIME_SEC;
c = Connection(url.str(), options);
c.open();
@@ -81,7 +88,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(statusCheck.linkHeartbeatInterval/sys::TIME_MSEC);
+ messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -89,23 +96,22 @@ void StatusCheckThread::run() {
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
string status = details["status"].getString();
+ QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
statusCheck.setPromote(false);
- QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
- << status << ", this broker will refuse promotion.");
+ QPID_LOG(info, logPrefix << "Joining established cluster");
}
- QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(info, statusCheck.logPrefix << "Error checking status of " << url
- << ": " << error.what());
+ // Its not an error to fail to connect to self.
+ if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
+ QPID_LOG(warning, logPrefix << error.what());
}
try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
- : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
{}
StatusCheck::~StatusCheck() {
@@ -116,7 +122,7 @@ StatusCheck::~StatusCheck() {
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
for (size_t i = 0; i < url.size(); ++i)
- threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+ threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
bool StatusCheck::canPromote() {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Mon Sep 9 17:08:23 2013
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
@@ -33,6 +34,8 @@
namespace qpid {
namespace ha {
+class HaBroker;
+
// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
// against bad promotion if there are READY brokers when this broker starts.
// It will not help the situation where brokers became READY after this one starts.
@@ -51,7 +54,7 @@ namespace ha {
class StatusCheck
{
public:
- StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
+ StatusCheck(HaBroker&);
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
@@ -59,12 +62,11 @@ class StatusCheck
private:
void setPromote(bool p);
- std::string logPrefix;
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- sys::Duration linkHeartbeatInterval;
- BrokerInfo brokerInfo;
+ HaBroker& haBroker;
+
friend class StatusCheckThread;
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Mon Sep 9 17:08:23 2013
@@ -222,7 +222,8 @@ void TxReplicator::members(const string&
if (!e.members.count(haBroker.getMembership().getSelf())) {
QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
// Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- haBroker.deleteQueue(getQueue()->getName());
+ haBroker.getBroker().deleteQueue(
+ getQueue()->getName(), haBroker.getUserId(), string());
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Sep 9 17:08:23 2013
@@ -125,7 +125,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
+ "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1521190&r1=1521189&r2=1521190&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Sep 9 17:08:23 2013
@@ -565,6 +565,7 @@ class ReplicationTests(HaBrokerTest):
# Verify that replication works with auth=yes and HA user has at least the following
# privileges:
aclf.write("""
+# HA user
acl allow zag@QPID access queue
acl allow zag@QPID create queue
acl allow zag@QPID consume queue
@@ -576,6 +577,9 @@ acl allow zag@QPID publish exchange
acl allow zag@QPID delete exchange
acl allow zag@QPID access method
acl allow zag@QPID create link
+# Normal user
+acl allow zig@QPID all all
+
acl deny all all
""")
aclf.close()
@@ -586,14 +590,16 @@ acl deny all all
"--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
],
client_credentials=Credentials("zag", "zag", "PLAIN"))
- s0 = cluster[0].connect(username="zag", password="zag").session();
+ c = cluster[0].connect(username="zig", password="zig")
+ s0 = c.session();
s0.receiver("q;{create:always}")
s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
- cluster[1].wait_backup("q")
- cluster[1].wait_backup("ex")
- s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
- s1.sender("ex").send("foo");
- self.assertEqual(s1.receiver("q").fetch().content, "foo")
+ s0.sender("ex").send("foo");
+ s1 = c.session(transactional=True)
+ s1.sender("ex").send("tx");
+ cluster[1].assert_browse_backup("q", ["foo"])
+ s1.commit()
+ cluster[1].assert_browse_backup("q", ["foo", "tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org