You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/12 15:38:09 UTC

svn commit: r1213258 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/broker/DeliveryRecord.cpp qpid/broker/Queue.cpp qpid/ha/QueueReplicator.cpp qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h tests/brokertest.py tests/ha_tests.py

Author: aconway
Date: Mon Dec 12 14:38:08 2011
New Revision: 1213258

URL: http://svn.apache.org/viewvc?rev=1213258&view=rev
Log:
QPID-3603: Fix QueueReplicator subscription parameters.

- Queue::destroyed cleans up observers.
- Clean up log messages, comments, some variable names.
- Improvements to brokertest.py

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Dec 12 14:38:08 2011
@@ -118,7 +118,8 @@ bool DeliveryRecord::accept(TransactionC
         } else if (isDelayedCompletion) {
             // FIXME aconway 2011-12-05: This should be done in HA code.
             msg.payload->getIngressCompletion().finishCompleter();
-            QPID_LOG(debug, "Completed " << msg.payload.get());
+            QPID_LOG(debug, "Completed " << msg.queue->getName()
+                     << "[" << msg.position << "]");
         }
         setEnded();
         QPID_LOG(debug, "Accepted " << id);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec 12 14:38:08 2011
@@ -1069,6 +1069,10 @@ void Queue::destroyed()
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
     notifyDeleted();
+    {
+        Mutex::ScopedLock locker(messageLock);
+        observers.clear();
+    }
 }
 
 void Queue::notifyDeleted()

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Dec 12 14:38:08 2011
@@ -83,7 +83,7 @@ void QueueReplicator::initializeBridge(B
     qpid::framing::SequenceNumber oldest;
     if (queue->getOldest(oldest))
         settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
-    peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
+    peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
     QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
@@ -98,6 +98,7 @@ void QueueReplicator::route(Deliverable&
         qpid::framing::SequenceSet latest;
         latest.decode(buffer);
 
+        QPID_LOG(trace, "HA: Backup received dequeues: " << latest);
         //TODO: should be able to optimise the following
         for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
             if (current < *i) {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Dec 12 14:38:08 2011
@@ -48,23 +48,26 @@ class ReplicationStateInitialiser
     ReplicationStateInitialiser(
         qpid::framing::SequenceSet& r,
         const qpid::framing::SequenceNumber& s,
-        const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+        const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e)
     {
-        results.add(start, end);
+        dequeues.add(start, end);
     }
 
     void operator()(const QueuedMessage& message) {
         if (message.position < start) {
             //replica does not have a message that should still be on the queue
             QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+            // FIXME aconway 2011-12-09: we want the replica to dump
+            // its messages and start from scratch in this case.
         } else if (message.position >= start && message.position <= end) {
-            //i.e. message is within the intial range and has not been dequeued, so remove it from the results
-            results.remove(message.position);
+            //i.e. message is within the intial range and has not been dequeued,
+            //so remove it from the dequeues
+            dequeues.remove(message.position);
         } //else message has not been seen by replica yet so can be ignored here
     }
 
   private:
-    qpid::framing::SequenceSet& results;
+    qpid::framing::SequenceSet& dequeues;
     const qpid::framing::SequenceNumber start;
     const qpid::framing::SequenceNumber end;
 };
@@ -94,6 +97,7 @@ ReplicatingSubscription::Factory::create
         rs.reset(new ReplicatingSubscription(
                      parent, name, queue, ack, false, exclusive, tag,
                      resumeId, resumeTtl, arguments));
+        // FIXME aconway 2011-12-08: need to removeObserver also.
         queue->addObserver(rs);
     }
     return rs;
@@ -115,6 +119,12 @@ ReplicatingSubscription::ReplicatingSubs
     events(new Queue(mask(name))),
     consumer(new DelegatingConsumer(*this))
 {
+    // FIXME aconway 2011-12-09: Here we take advantage of existing
+    // messages on the backup queue to reduce replication
+    // effort. However if the backup queue is inconsistent with being
+    // a backup of the primary queue, then we want to issue a warning
+    // and tell the backup to dump its messages and start replicating
+    // from scratch.
     QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
     if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
         qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
@@ -127,19 +137,22 @@ ReplicatingSubscription::ReplicatingSubs
         qpid::framing::SequenceNumber oldest;
         if (queue->getOldest(oldest)) {
             if (oldest >= hwm) {
-                range.add(lwm, --oldest);
+                dequeues.add(lwm, --oldest);
             } else if (oldest >= lwm) {
-                ReplicationStateInitialiser initialiser(range, lwm, hwm);
+                ReplicationStateInitialiser initialiser(dequeues, lwm, hwm);
                 queue->eachMessage(initialiser);
-            } else { //i.e. have older message on master than is reported to exist on replica
-                QPID_LOG(warning, "HA: Replica  missing message on master");
+            } else { //i.e. older message on master than is reported to exist on replica
+                // FIXME aconway 2011-12-09: dump and start from scratch?
+                QPID_LOG(warning, "HA: Replica missing message on primary");
             }
         } else {
             //local queue (i.e. master) is empty
-            range.add(lwm, queue->getPosition());
+            dequeues.add(lwm, queue->getPosition());
+            // FIXME aconway 2011-12-09: if hwm >
+            // queue->getPosition(), dump and start from scratch?
         }
         QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
-                 << range << " (lwm=" << lwm << ", hwm=" << hwm
+                 << dequeues << " (lwm=" << lwm << ", hwm=" << hwm
                  << ", current=" << queue->getPosition() << ")");
         //set position of 'cursor'
         position = hwm;
@@ -162,7 +175,7 @@ ReplicatingSubscription::~ReplicatingSub
 //under the message lock in the queue
 void ReplicatingSubscription::enqueued(const QueuedMessage& m)
 {
-    QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+    QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName());
     //delay completion
     m.payload->getIngressCompletion().startCompleter();
 }
@@ -170,11 +183,11 @@ void ReplicatingSubscription::enqueued(c
 // Called with lock held.
 void ReplicatingSubscription::generateDequeueEvent()
 {
-    QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range);
-    string buf(range.encodedSize(),'\0');
+    QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName());
+    string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
-    range.encode(buffer);
-    range.clear();
+    dequeues.encode(buffer);
+    dequeues.clear();
     buffer.reset();
     //generate event message
     boost::intrusive_ptr<Message> event = new Message();
@@ -199,24 +212,20 @@ void ReplicatingSubscription::generateDe
     events->deliver(event);
 }
 
-// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this?
-// If a queue is drained with no new messages coming on 
-// will the messages be dequeued on the backup?
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
+// Called after the message has been removed from the deque and under
+// the message lock in the queue.
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)
 {
     {
         sys::Mutex::ScopedLock l(lock);
-        range.add(m.position);
-        // FIXME aconway 2011-11-29: q[pos] logging
-        QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+        dequeues.add(m.position);
+        QPID_LOG(trace, "HA: Added " << QueuePos(m)
+                 << " to dequeue event; subscription at " << position);
     }
-    notify();
+    notify();                   // Ensure a call to doDispatch
     if (m.position > position) {
         m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+        QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early, dequeued.");
     }
 }
 
@@ -224,7 +233,7 @@ bool ReplicatingSubscription::doDispatch
 {
     {
         sys::Mutex::ScopedLock l(lock);
-        if (!range.empty()) {
+        if (!dequeues.empty()) {
             generateDequeueEvent();
         }
     }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Dec 12 14:38:08 2011
@@ -80,13 +80,13 @@ class ReplicatingSubscription : public b
     void requeued(const broker::QueuedMessage&) {}
 
     bool isDelayedCompletion() const { return true; }
-    
+
   protected:
     bool doDispatch();
   private:
     boost::shared_ptr<broker::Queue> events;
     boost::shared_ptr<broker::Consumer> consumer;
-    qpid::framing::SequenceSet range;
+    qpid::framing::SequenceSet dequeues;
 
     void generateDequeueEvent();
     class DelegatingConsumer : public Consumer

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/brokertest.py Mon Dec 12 14:38:08 2011
@@ -496,30 +496,30 @@ class BrokerTest(TestCase):
         cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
         return cluster
 
-    def browse(self, session, queue, timeout=0):
+    def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
         """Return a list with the contents of each message on queue."""
         r = session.receiver("%s;{mode:browse}"%(queue))
         r.capacity = 100
         try:
             contents = []
             try:
-                while True: contents.append(r.fetch(timeout=timeout).content)
+                while True: contents.append(transform(r.fetch(timeout=timeout)))
             except messaging.Empty: pass
         finally: r.close()
         return contents
 
-    def assert_browse(self, session, queue, expect_contents, timeout=0):
+    def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content):
         """Assert that the contents of messages on queue (as retrieved
         using session and timeout) exactly match the strings in
         expect_contents"""
-        actual_contents = self.browse(session, queue, timeout)
+        actual_contents = self.browse(session, queue, timeout, transform=transform)
         self.assertEqual(expect_contents, actual_contents)
 
-    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
         """Wait up to timeout for contents of queue to match expect_contents"""
-        def test(): return self.browse(session, queue, 0) == expect_contents
+        test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
         retry(test, timeout, delay)
-        self.assertEqual(expect_contents, self.browse(session, queue, 0))
+        self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
 
 def join(thread, timeout=10):
     thread.join(timeout)

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1213258&r1=1213257&r2=1213258&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Mon Dec 12 14:38:08 2011
@@ -65,7 +65,7 @@ class ShortTests(BrokerTest):
             s = p.sender(queue(prefix+"q1", "all"))
             for m in ["a", "b", "1"]: s.send(Message(m))
             # Test replication of dequeue
-            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") 
+            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
             p.acknowledge()
             p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
             p.sender(queue(prefix+"q3", "none")).send(Message("3"))
@@ -82,8 +82,8 @@ class ShortTests(BrokerTest):
             # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
 
-            # FIXME aconway 2011-12-02: 
-            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") 
+            # FIXME aconway 2011-12-02:
+            self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
             p.acknowledge()
 
             self.assert_browse_retry(b, prefix+"q2", []) # wiring only
@@ -118,9 +118,12 @@ class ShortTests(BrokerTest):
         self.assert_browse_retry(p, "foo", [])
         self.assert_browse_retry(b, "foo", [])
 
+    def qpid_replicate(self, value="all"):
+        return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
+
     def test_sync(self):
         def queue(name, replicate):
-            return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+            return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
         primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
         p = primary.connect().session()
         s = p.sender(queue("q","all"))
@@ -134,12 +137,47 @@ class ShortTests(BrokerTest):
         s.sync()
         msgs = [str(i) for i in range(30)]
 
-        b = backup1.connect().session()
-        self.assert_browse_retry(b, "q", msgs)
-
-        b = backup2.connect().session()
+        self.assert_browse_retry(backup1.connect().session(), "q", msgs)
         self.assert_browse_retry(backup2.connect().session(), "q", msgs)
 
+    def test_send_receive(self):
+        # FIXME aconway 2011-12-09: test with concurrent senders/receivers.
+        debug = ["-t"]        # FIXME aconway 2011-12-08:
+        primary = self.ha_broker(name="primary", broker_url="primary", args=debug)
+        backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port(), args=debug)
+        backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port(), args=debug)
+        sender = self.popen(
+            ["qpid-send",
+             "--broker", primary.host_port(),
+             "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+             "--messages=1000",         # FIXME aconway 2011-12-09: 
+             "--content-string=x"
+             ])
+        receiver = self.popen(
+            ["qpid-receive",
+             "--broker", primary.host_port(),
+             "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+             "--messages=990",          # FIXME aconway 2011-12-09: 
+             "--timeout=10"
+             ])
+        try:
+            self.assertEqual(sender.wait(), 0)
+            self.assertEqual(receiver.wait(), 0)
+            expect = [long(i) for i in range(991, 1001)]
+            sn = lambda m: m.properties["sn"]
+            self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn)
+            self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn)
+        except:
+            # FIXME aconway 2011-12-09: 
+            print self.browse(primary.connect().session(), "q", transform=sn)
+            print self.browse(backup1.connect().session(), "q", transform=sn)
+            print self.browse(backup2.connect().session(), "q", transform=sn)
+#             os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(primary.host_port()))
+#             print "---- backup1"
+#             os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup1.host_port()))
+#             print "---- backup2"
+#             os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup2.host_port()))
+            raise
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org