You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/09 19:08:40 UTC
svn commit: r1521192 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/
Author: aconway
Date: Mon Sep 9 17:08:39 2013
New Revision: 1521192
URL: http://svn.apache.org/r1521192
Log:
QPID-4327: HA support for TX transactions - fix TX error messages.
- Ignore un-replicated queues when replicating transactions.
- Clean up cancel logic in QueueReplicator, causing "no such subscription" errors.
- Remove unnecessary exchange delete warnings
- ha_test.py: Shorter timeout for starting cluster brokers.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Sep 9 17:08:39 2013
@@ -560,11 +560,7 @@ void BrokerReplicator::doEventExchangeDe
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
- if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.getLevel(*exchange)) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
- } else {
+ if (exchange && replicationTest.getLevel(*exchange)) {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Sep 9 17:08:39 2013
@@ -203,7 +203,7 @@ std::vector<Url> HaBroker::getKnownBroke
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, message);
+ QPID_LOG(critical, "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Mon Sep 9 17:08:39 2013
@@ -79,7 +79,9 @@ class PrimaryTxObserver::Exchange : publ
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true),
+ haBroker(hb), broker(hb.getBroker()),
+ replicationTest(hb.getSettings().replicateDefault.get()),
+ id(true),
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
failed(false), ended(false)
{
@@ -98,8 +100,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
pair<QueuePtr, bool> result =
broker.getQueues().declare(
- TRANSACTION_REPLICATOR_PREFIX+id.str(),
- QueueSettings(/*durable*/false, /*autodelete*/true));
+ exchangeName, QueueSettings(/*durable*/false, /*autodelete*/true));
assert(result.second);
txQueue = result.first;
txQueue->deliver(TxMembersEvent(members).message());
@@ -109,25 +110,35 @@ PrimaryTxObserver::~PrimaryTxObserver()
void PrimaryTxObserver::initialize() {
- broker.getExchanges().registerExchange(
- boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
+ boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
+ broker.getExchanges().registerExchange(ex);
}
void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
- enqueues[q] += m.getReplicationId();
- txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
- txQueue->deliver(m);
+ if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+ QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+ enqueues[q] += m.getReplicationId();
+ txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
+ txQueue->deliver(m);
+ }
}
void PrimaryTxObserver::dequeue(
const QueuePtr& q, QueuePosition pos, ReplicationId id)
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
- txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+ if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
+ QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
+ }
+ else {
+ QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
+ << LogMessageId(*q, pos, id));
+ }
}
void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Mon Sep 9 17:08:39 2013
@@ -23,7 +23,7 @@
*/
#include "types.h"
-
+#include "ReplicationTest.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
@@ -97,6 +97,7 @@ class PrimaryTxObserver : public broker:
std::string logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
+ ReplicationTest replicationTest;
types::Uuid id;
std::string exchangeName;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Sep 9 17:08:39 2013
@@ -112,7 +112,6 @@ QueueReplicator::QueueReplicator(HaBroke
logPrefix("Backup of "+q->getName()+": "),
subscribed(false),
settings(hb.getSettings()),
- destroyed(false),
nextId(0), maxId(0)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
@@ -181,8 +180,7 @@ void QueueReplicator::destroy() {
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
- if (destroyed) return;
- destroyed = true;
+ if (!queue) return; // Already destroyed
QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
@@ -197,7 +195,7 @@ void QueueReplicator::destroy() {
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
Mutex::ScopedLock l(lock);
- if (destroyed) return; // Already destroyed
+ if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -225,14 +223,6 @@ void QueueReplicator::initializeBridge(B
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
-void QueueReplicator::cancel(Mutex::ScopedLock&) {
- if (sessionHandler) {
- // Cancel the replicating subscription.
- AMQP_ServerProxy peer(sessionHandler->out);
- peer.getMessage().cancel(getName());
- }
-}
-
namespace {
template <class T> T decodeContent(Message& m) {
std::string content = m.getContent();
@@ -259,7 +249,7 @@ void QueueReplicator::route(Deliverable&
{
try {
Mutex::ScopedLock l(lock);
- if (destroyed) return;
+ if (!queue) return; // Already destroyed
broker::Message& message(deliverable.getMessage());
string key(message.getRoutingKey());
if (!isEventKey(message.getRoutingKey())) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon Sep 9 17:08:39 2013
@@ -94,7 +94,6 @@ class QueueReplicator : public broker::E
virtual void deliver(const broker::Message&);
virtual void destroy(); // Called when the queue is destroyed.
- void cancel(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -122,7 +121,6 @@ class QueueReplicator : public broker::E
bool subscribed;
const Settings& settings;
- bool destroyed;
PositionMap positions;
ReplicationIdSet idSet; // Set of replicationIds on the queue.
ReplicationId nextId; // ID for next message to arrive.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Sep 9 17:08:39 2013
@@ -41,6 +41,13 @@ namespace ha {
/**
* Test whether something is replicated, taking into account the
* default replication level.
+ *
+ * The primary uses a ReplicationTest with default based on configuration
+ * settings, and marks objects to be replicated with an explict replication
+ * argument.
+ *
+ * The backup uses a default of NONE, so it always accepts what the primary has
+ * marked on the object.
*/
class ReplicationTest
{
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Mon Sep 9 17:08:39 2013
@@ -215,21 +215,24 @@ void TxReplicator::rollback(const string
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
TxMembersEvent e;
decodeStr(data, e);
QPID_LOG(debug, logPrefix << "Members: " << e.members);
if (!e.members.count(haBroker.getMembership().getSelf())) {
QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- haBroker.getBroker().deleteQueue(
- getQueue()->getName(), haBroker.getUserId(), string());
+ end(l);
}
}
-void TxReplicator::end(sys::Mutex::ScopedLock& l) {
+void TxReplicator::end(sys::Mutex::ScopedLock&) {
complete = true;
- cancel(l);
+ if (!getQueue()) return; // Already destroyed
+ // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
+ // Need to do this now to cancel the subscription to the primary tx-queue
+ // which informs the primary that we have completed the transaction.
+ haBroker.getBroker().deleteQueue(
+ getQueue()->getName(), haBroker.getUserId(), string());
}
void TxReplicator::destroy() {
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Sep 9 17:08:39 2013
@@ -125,7 +125,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::", "--log-enable=debug+:acl::",
+ "--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
@@ -323,7 +323,7 @@ class HaCluster(object):
ha_port = self._ports[i]
b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
args=args, **self.kwargs)
- b.ready()
+ b.ready(timeout=5)
return b
def start(self):
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1521192&r1=1521191&r2=1521192&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Sep 9 17:08:39 2013
@@ -562,8 +562,7 @@ class ReplicationTests(HaBrokerTest):
return
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
- # Verify that replication works with auth=yes and HA user has at least the following
- # privileges:
+ # Minimum set of privileges required for the HA user.
aclf.write("""
# HA user
acl allow zag@QPID access queue
@@ -592,14 +591,14 @@ acl deny all all
client_credentials=Credentials("zag", "zag", "PLAIN"))
c = cluster[0].connect(username="zig", password="zig")
s0 = c.session();
- s0.receiver("q;{create:always}")
- s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ s0.sender("q;{create:always}")
+ s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
s0.sender("ex").send("foo");
s1 = c.session(transactional=True)
- s1.sender("ex").send("tx");
+ s1.sender("ex").send("foo-tx");
cluster[1].assert_browse_backup("q", ["foo"])
s1.commit()
- cluster[1].assert_browse_backup("q", ["foo", "tx"])
+ cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org