You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/09/14 20:48:09 UTC

svn commit: r1384882 - in /qpid/trunk/qpid/cpp/src: qpid/ha/HaBroker.cpp qpid/ha/Primary.cpp qpid/ha/Primary.h tests/brokertest.py tests/ha_tests.py

Author: aconway
Date: Fri Sep 14 18:48:09 2012
New Revision: 1384882

URL: http://svn.apache.org/viewvc?rev=1384882&view=rev
Log:
QPID-4223: HA Completion isn't sent when queue that has acquired but unacknowledged messages is deleted

- Extended ha_test.py test_failover_send_receive to kill backup as well as primary
- QueueRegistry::destroy was not calling observer.
- Primary removes disconnected brokers backups and expectedBackups
- Primary calls checkReady in all cases where broker is removed from expectedBackups

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1384882&r1=1384881&r2=1384882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Sep 14 18:48:09 2012
@@ -342,9 +342,12 @@ void HaBroker::addBroker(const BrokerInf
 
 void HaBroker::removeBroker(const Uuid& id) {
     Mutex::ScopedLock l(lock);
-    membership.remove(id);
-    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id);
-    membershipUpdated(l);
+    BrokerInfo info;
+    if (membership.get(id, info)) {
+        membership.remove(id);
+        QPID_LOG(debug, logPrefix << "Membership remove: " <<  info);
+        membershipUpdated(l);
+    }
 }
 
 void HaBroker::setLinkProperties(Mutex::ScopedLock&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1384882&r1=1384881&r2=1384882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Sep 14 18:48:09 2012
@@ -225,19 +225,20 @@ void Primary::opened(broker::Connection&
 }
 
 void Primary::closed(broker::Connection& connection) {
-    // NOTE: It is possible for a backup connection to be rejected while we are
-    // a backup, but closed() is called after we have become primary.
-    //
-    // For this reason we do not remove from the backups map here, the backups
-    // map holds all the backups we know about whether connected or not.
-    //
-    Mutex::ScopedLock l(lock);
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
-        QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
-        haBroker.removeBroker(info.getSystemId());
+        Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
-        if (i != backups.end()) i->second->setConnected(false);
+        // NOTE: It is possible for a backup connection to be rejected while we
+        // are a backup, but closed() is called after we have become primary.
+        // Checking  isConnected() lets us ignore such spurious closes.
+        if (i != backups.end() && i->second->isConnected()) {
+            QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
+            haBroker.removeBroker(info.getSystemId());
+            expectedBackups.erase(i->second);
+            backups.erase(i);
+            checkReady(l);
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1384882&r1=1384881&r2=1384882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Fri Sep 14 18:48:09 2012
@@ -95,13 +95,12 @@ class Primary
     bool active;
     /**
      * Set of expected backups that must be ready before we declare ourselves
-     * active
+     * active. These are backups that were known before the primary crashed. As
+     * new primary we expect them to re-connect.
      */
     BackupSet expectedBackups;
     /**
-     * Map of all the remote backups we know about: any expected backups plus
-     * all actual backups that have connected. We do not remove entries when a
-     * backup disconnects. @see Primary::closed()
+     * Map of all the expected backups plus all connected backups.
      */
     BackupMap backups;
     boost::shared_ptr<broker::ConnectionObserver> connectionObserver;

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1384882&r1=1384881&r2=1384882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Sep 14 18:48:09 2012
@@ -565,7 +565,7 @@ class NumberedSender(Thread):
 
     def __init__(self, broker, max_depth=None, queue="test-queue",
                  connection_options=Cluster.CONNECTION_OPTIONS,
-                 failover_updates=True, url=None):
+                 failover_updates=True, url=None, args=[]):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
         Requires self.notify_received(n) to be called each time messages are received.
@@ -576,7 +576,7 @@ class NumberedSender(Thread):
                "--address", "%s;{create:always}"%queue,
                "--connection-options", "{%s}"%(connection_options),
                "--content-stdin"
-               ]
+               ] + args
         if failover_updates: cmd += ["--failover-updates"]
         self.sender = broker.test.popen(
             cmd, expect=EXPECT_RUNNING, stdin=PIPE)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1384882&r1=1384881&r2=1384882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Sep 14 18:48:09 2012
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
 import traceback
 from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from qpid.datatypes import uuid4
@@ -114,7 +114,8 @@ class HaBroker(Broker):
                 self._status = self.ha_status()
                 return self._status == status;
             except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
+        assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+            self, status, self._status)
 
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
@@ -887,29 +888,54 @@ class LongTests(BrokerTest):
         # Kill and restart brokers in a cycle:
         endtime = time.time() + self.duration()
         i = 0
+        primary = 0
         try:
             while time.time() < endtime or i < 3: # At least 3 iterations
+                # Precondition: All 3 brokers running,
+                # primary = index of promoted primary
+                # one or two backups are running,
                 for s in senders: s.sender.assert_running()
                 for r in receivers: r.receiver.assert_running()
-                checkpoint = [ r.received for r in receivers ]
-                # Don't kill primary till it is active and the next
-                # backup is ready, otherwise we can lose messages.
-                brokers[i%3].wait_status("active")
-                brokers[(i+1)%3].wait_status("ready")
-                brokers.bounce(i%3)
+                checkpoint = [ r.received+100 for r in receivers ]
+                dead = None
+                victim = random.randint(0,2)
+                if victim == primary:
+                    # Don't kill primary till it is active and the next
+                    # backup is ready, otherwise we can lose messages.
+                    brokers[victim].wait_status("active")
+                    next = (victim+1)%3
+                    brokers[next].wait_status("ready")
+                    brokers.bounce(victim) # Next one is promoted
+                    primary = next
+                else:
+                    brokers.kill(victim, False)
+                    dead = victim
+
+                # At this point the primary is running with 1 or 2 backups
+                # Make sure we are not stalled
+                map(wait_passed, receivers, checkpoint)
+                # Run another checkpoint to ensure things work in this configuration
+                checkpoint = [ r.received+100 for r in receivers ]
+                map(wait_passed, receivers, checkpoint)
+
+                if dead is not None:
+                    brokers.restart(dead) # Restart backup
+                    brokers[dead].ready(client_properties={"qpid.ha-admin":1})
+                    dead = None
                 i += 1
-                map(wait_passed, receivers, checkpoint) # Wait for all receivers
         except:
             traceback.print_exc()
             raise
         finally:
             for s in senders: s.stop()
             for r in receivers: r.stop()
-            dead = []
+            unexpected_dead = []
             for i in xrange(3):
-                if not brokers[i].is_running(): dead.append(i)
-                brokers.kill(i, False)
-            if dead: raise Exception("Brokers not running: %s"%dead)
+                if not brokers[i].is_running() and i != dead:
+                    unexpected_dead.append(i)
+                if brokers[i].is_running(): brokers.kill(i, False)
+            if unexpected_dead:
+                raise Exception("Brokers not running: %s"%unexpected_dead)
 
 class RecoveryTests(BrokerTest):
     """Tests for recovery after a failure."""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org