You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/12 15:38:09 UTC
svn commit: r1213258 - in /qpid/branches/qpid-3603/qpid/cpp/src:
qpid/broker/DeliveryRecord.cpp qpid/broker/Queue.cpp
qpid/ha/QueueReplicator.cpp qpid/ha/ReplicatingSubscription.cpp
qpid/ha/ReplicatingSubscription.h tests/brokertest.py tests/ha_tests.py
Author: aconway
Date: Mon Dec 12 14:38:08 2011
New Revision: 1213258
URL: http://svn.apache.org/viewvc?rev=1213258&view=rev
Log:
QPID-3603: Fix QueueReplicator subscription parameters.
- Queue::destroyed cleans up observers.
- Clean up log messages, comments, some variable names.
- Improvements to brokertest.py
Modified:
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Dec 12 14:38:08 2011
@@ -118,7 +118,8 @@ bool DeliveryRecord::accept(TransactionC
} else if (isDelayedCompletion) {
// FIXME aconway 2011-12-05: This should be done in HA code.
msg.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << msg.payload.get());
+ QPID_LOG(debug, "Completed " << msg.queue->getName()
+ << "[" << msg.position << "]");
}
setEnded();
QPID_LOG(debug, "Accepted " << id);
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec 12 14:38:08 2011
@@ -1069,6 +1069,10 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observers.clear();
+ }
}
void Queue::notifyDeleted()
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Dec 12 14:38:08 2011
@@ -83,7 +83,7 @@ void QueueReplicator::initializeBridge(B
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
- peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
@@ -98,6 +98,7 @@ void QueueReplicator::route(Deliverable&
qpid::framing::SequenceSet latest;
latest.decode(buffer);
+ QPID_LOG(trace, "HA: Backup received dequeues: " << latest);
//TODO: should be able to optimise the following
for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
if (current < *i) {
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Dec 12 14:38:08 2011
@@ -48,23 +48,26 @@ class ReplicationStateInitialiser
ReplicationStateInitialiser(
qpid::framing::SequenceSet& r,
const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+ const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e)
{
- results.add(start, end);
+ dequeues.add(start, end);
}
void operator()(const QueuedMessage& message) {
if (message.position < start) {
//replica does not have a message that should still be on the queue
QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+ // FIXME aconway 2011-12-09: we want the replica to dump
+ // its messages and start from scratch in this case.
} else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued, so remove it from the results
- results.remove(message.position);
+ //i.e. message is within the intial range and has not been dequeued,
+ //so remove it from the dequeues
+ dequeues.remove(message.position);
} //else message has not been seen by replica yet so can be ignored here
}
private:
- qpid::framing::SequenceSet& results;
+ qpid::framing::SequenceSet& dequeues;
const qpid::framing::SequenceNumber start;
const qpid::framing::SequenceNumber end;
};
@@ -94,6 +97,7 @@ ReplicatingSubscription::Factory::create
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, false, exclusive, tag,
resumeId, resumeTtl, arguments));
+ // FIXME aconway 2011-12-08: need to removeObserver also.
queue->addObserver(rs);
}
return rs;
@@ -115,6 +119,12 @@ ReplicatingSubscription::ReplicatingSubs
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ // FIXME aconway 2011-12-09: Here we take advantage of existing
+ // messages on the backup queue to reduce replication
+ // effort. However if the backup queue is inconsistent with being
+ // a backup of the primary queue, then we want to issue a warning
+ // and tell the backup to dump its messages and start replicating
+ // from scratch.
QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
@@ -127,19 +137,22 @@ ReplicatingSubscription::ReplicatingSubs
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest)) {
if (oldest >= hwm) {
- range.add(lwm, --oldest);
+ dequeues.add(lwm, --oldest);
} else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ ReplicationStateInitialiser initialiser(dequeues, lwm, hwm);
queue->eachMessage(initialiser);
- } else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "HA: Replica missing message on master");
+ } else { //i.e. older message on master than is reported to exist on replica
+ // FIXME aconway 2011-12-09: dump and start from scratch?
+ QPID_LOG(warning, "HA: Replica missing message on primary");
}
} else {
//local queue (i.e. master) is empty
- range.add(lwm, queue->getPosition());
+ dequeues.add(lwm, queue->getPosition());
+ // FIXME aconway 2011-12-09: if hwm >
+ // queue->getPosition(), dump and start from scratch?
}
QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
- << range << " (lwm=" << lwm << ", hwm=" << hwm
+ << dequeues << " (lwm=" << lwm << ", hwm=" << hwm
<< ", current=" << queue->getPosition() << ")");
//set position of 'cursor'
position = hwm;
@@ -162,7 +175,7 @@ ReplicatingSubscription::~ReplicatingSub
//under the message lock in the queue
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
- QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName());
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
@@ -170,11 +183,11 @@ void ReplicatingSubscription::enqueued(c
// Called with lock held.
void ReplicatingSubscription::generateDequeueEvent()
{
- QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range);
- string buf(range.encodedSize(),'\0');
+ QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName());
+ string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
- range.encode(buffer);
- range.clear();
+ dequeues.encode(buffer);
+ dequeues.clear();
buffer.reset();
//generate event message
boost::intrusive_ptr<Message> event = new Message();
@@ -199,24 +212,20 @@ void ReplicatingSubscription::generateDe
events->deliver(event);
}
-// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this?
-// If a queue is drained with no new messages coming on
-// will the messages be dequeued on the backup?
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
+// Called after the message has been removed from the deque and under
+// the message lock in the queue.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
{
sys::Mutex::ScopedLock l(lock);
- range.add(m.position);
- // FIXME aconway 2011-11-29: q[pos] logging
- QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+ dequeues.add(m.position);
+ QPID_LOG(trace, "HA: Added " << QueuePos(m)
+ << " to dequeue event; subscription at " << position);
}
- notify();
+ notify(); // Ensure a call to doDispatch
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early, dequeued.");
}
}
@@ -224,7 +233,7 @@ bool ReplicatingSubscription::doDispatch
{
{
sys::Mutex::ScopedLock l(lock);
- if (!range.empty()) {
+ if (!dequeues.empty()) {
generateDequeueEvent();
}
}
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Dec 12 14:38:08 2011
@@ -80,13 +80,13 @@ class ReplicatingSubscription : public b
void requeued(const broker::QueuedMessage&) {}
bool isDelayedCompletion() const { return true; }
-
+
protected:
bool doDispatch();
private:
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
- qpid::framing::SequenceSet range;
+ qpid::framing::SequenceSet dequeues;
void generateDequeueEvent();
class DelegatingConsumer : public Consumer
Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py Mon Dec 12 14:38:08 2011
@@ -496,30 +496,30 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
- def browse(self, session, queue, timeout=0):
+ def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
contents = []
try:
- while True: contents.append(r.fetch(timeout=timeout).content)
+ while True: contents.append(transform(r.fetch(timeout=timeout)))
except messaging.Empty: pass
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
- actual_contents = self.browse(session, queue, timeout)
+ actual_contents = self.browse(session, queue, timeout, transform=transform)
self.assertEqual(expect_contents, actual_contents)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
"""Wait up to timeout for contents of queue to match expect_contents"""
- def test(): return self.browse(session, queue, 0) == expect_contents
+ test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0))
+ self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
def join(thread, timeout=10):
thread.join(timeout)
Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Mon Dec 12 14:38:08 2011
@@ -65,7 +65,7 @@ class ShortTests(BrokerTest):
s = p.sender(queue(prefix+"q1", "all"))
for m in ["a", "b", "1"]: s.send(Message(m))
# Test replication of dequeue
- self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
p.acknowledge()
p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
@@ -82,8 +82,8 @@ class ShortTests(BrokerTest):
# FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
- # FIXME aconway 2011-12-02:
- self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
+ # FIXME aconway 2011-12-02:
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
p.acknowledge()
self.assert_browse_retry(b, prefix+"q2", []) # wiring only
@@ -118,9 +118,12 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", [])
self.assert_browse_retry(b, "foo", [])
+ def qpid_replicate(self, value="all"):
+ return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
+
def test_sync(self):
def queue(name, replicate):
- return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+ return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
s = p.sender(queue("q","all"))
@@ -134,12 +137,47 @@ class ShortTests(BrokerTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b = backup1.connect().session()
- self.assert_browse_retry(b, "q", msgs)
-
- b = backup2.connect().session()
+ self.assert_browse_retry(backup1.connect().session(), "q", msgs)
self.assert_browse_retry(backup2.connect().session(), "q", msgs)
+ def test_send_receive(self):
+ # FIXME aconway 2011-12-09: test with concurrent senders/receivers.
+ debug = ["-t"] # FIXME aconway 2011-12-08:
+ primary = self.ha_broker(name="primary", broker_url="primary", args=debug)
+ backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port(), args=debug)
+ backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port(), args=debug)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--messages=1000", # FIXME aconway 2011-12-09:
+ "--content-string=x"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--messages=990", # FIXME aconway 2011-12-09:
+ "--timeout=10"
+ ])
+ try:
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn)
+ self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn)
+ except:
+ # FIXME aconway 2011-12-09:
+ print self.browse(primary.connect().session(), "q", transform=sn)
+ print self.browse(backup1.connect().session(), "q", transform=sn)
+ print self.browse(backup2.connect().session(), "q", transform=sn)
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(primary.host_port()))
+# print "---- backup1"
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup1.host_port()))
+# print "---- backup2"
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup2.host_port()))
+ raise
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org