You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/09/27 23:51:30 UTC

svn commit: r1391232 - in /qpid/trunk/qpid/cpp/src/tests: ha_test.py ha_tests.py

Author: aconway
Date: Thu Sep 27 21:51:30 2012
New Revision: 1391232

URL: http://svn.apache.org/viewvc?rev=1391232&view=rev
Log:
NO-JIRA: Fix logging in ha_tests.py

In order to suppress unwanted warnings from certain test, the ha_test framework
was actually turning off all python logging.

This patch selectively turns off wanrnings in specific code regions and then
restores the configured logging level.

Modified:
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1391232&r1=1391231&r2=1391232&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Thu Sep 27 21:51:30 2012
@@ -30,6 +30,18 @@ from uuid import UUID
 
 log = getLogger(__name__)
 
+class LogLevel:
+    """
+    Temporarily change the log settings on the root logger.
+    Used to suppress expected WARN messages from the python client.
+    """
+    def __init__(self, level):
+        self.save_level = getLogger().getEffectiveLevel()
+        getLogger().setLevel(level)
+
+    def restore(self):
+        getLogger().setLevel(self.save_level)
+
 class QmfAgent(object):
     """Access to a QMF broker agent."""
     def __init__(self, address, **kwargs):
@@ -73,7 +85,6 @@ class HaBroker(Broker):
         assert os.path.exists(self.qpid_ha_path)
         self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
         assert os.path.exists(self.qpid_config_path)
-        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         self.qpid_ha_script=import_script(self.qpid_ha_path)
         self._agent = None
         self.client_credentials = client_credentials

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1391232&r1=1391231&r2=1391232&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Sep 27 21:51:30 2012
@@ -88,42 +88,45 @@ class ReplicationTests(BrokerTest):
             b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
             self.assert_browse_retry(b, prefix+"q4", ["6","7"])
 
-        primary = HaBroker(self, name="primary")
-        primary.promote()
-        p = primary.connect().session()
-
-        # Create config, send messages before starting the backup, to test catch-up replication.
-        setup(p, "1", primary)
-        backup  = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        # Create config, send messages after starting the backup, to test steady-state replication.
-        setup(p, "2", primary)
-
-        # Verify the data on the backup
-        b = backup.connect_admin().session()
-        verify(b, "1", p)
-        verify(b, "2", p)
-        # Test a series of messages, enqueue all then dequeue all.
-        s = p.sender(queue("foo","all"))
-        wait_address(b, "foo")
-        msgs = [str(i) for i in range(10)]
-        for m in msgs: s.send(Message(m))
-        self.assert_browse_retry(p, "foo", msgs)
-        self.assert_browse_retry(b, "foo", msgs)
-        r = p.receiver("foo")
-        for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
-        p.acknowledge()
-        self.assert_browse_retry(p, "foo", [])
-        self.assert_browse_retry(b, "foo", [])
-
-        # Another series, this time verify each dequeue individually.
-        for m in msgs: s.send(Message(m))
-        self.assert_browse_retry(p, "foo", msgs)
-        self.assert_browse_retry(b, "foo", msgs)
-        for i in range(len(msgs)):
-            self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            primary = HaBroker(self, name="primary")
+            primary.promote()
+            p = primary.connect().session()
+
+            # Create config, send messages before starting the backup, to test catch-up replication.
+            setup(p, "1", primary)
+            backup  = HaBroker(self, name="backup", brokers_url=primary.host_port())
+            # Create config, send messages after starting the backup, to test steady-state replication.
+            setup(p, "2", primary)
+
+            # Verify the data on the backup
+            b = backup.connect_admin().session()
+            verify(b, "1", p)
+            verify(b, "2", p)
+            # Test a series of messages, enqueue all then dequeue all.
+            s = p.sender(queue("foo","all"))
+            wait_address(b, "foo")
+            msgs = [str(i) for i in range(10)]
+            for m in msgs: s.send(Message(m))
+            self.assert_browse_retry(p, "foo", msgs)
+            self.assert_browse_retry(b, "foo", msgs)
+            r = p.receiver("foo")
+            for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
             p.acknowledge()
-            self.assert_browse_retry(p, "foo", msgs[i+1:])
-            self.assert_browse_retry(b, "foo", msgs[i+1:])
+            self.assert_browse_retry(p, "foo", [])
+            self.assert_browse_retry(b, "foo", [])
+
+            # Another series, this time verify each dequeue individually.
+            for m in msgs: s.send(Message(m))
+            self.assert_browse_retry(p, "foo", msgs)
+            self.assert_browse_retry(b, "foo", msgs)
+            for i in range(len(msgs)):
+                self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+                p.acknowledge()
+                self.assert_browse_retry(p, "foo", msgs[i+1:])
+                self.assert_browse_retry(b, "foo", msgs[i+1:])
+        finally: l.restore()
 
     def test_sync(self):
         primary = HaBroker(self, name="primary")
@@ -149,53 +152,59 @@ class ReplicationTests(BrokerTest):
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
-        brokers = HaCluster(self, 3)
-        sender = self.popen(
-            ["qpid-send",
-             "--broker", brokers[0].host_port(),
-             "--address", "q;{create:always}",
-             "--messages=1000",
-             "--content-string=x"
-             ])
-        receiver = self.popen(
-            ["qpid-receive",
-             "--broker", brokers[0].host_port(),
-             "--address", "q;{create:always}",
-             "--messages=990",
-             "--timeout=10"
-             ])
-        self.assertEqual(sender.wait(), 0)
-        self.assertEqual(receiver.wait(), 0)
-        expect = [long(i) for i in range(991, 1001)]
-        sn = lambda m: m.properties["sn"]
-        brokers[1].assert_browse_backup("q", expect, transform=sn)
-        brokers[2].assert_browse_backup("q", expect, transform=sn)
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            brokers = HaCluster(self, 3)
+            sender = self.popen(
+                ["qpid-send",
+                 "--broker", brokers[0].host_port(),
+                 "--address", "q;{create:always}",
+                 "--messages=1000",
+                 "--content-string=x"
+                 ])
+            receiver = self.popen(
+                ["qpid-receive",
+                 "--broker", brokers[0].host_port(),
+                 "--address", "q;{create:always}",
+                 "--messages=990",
+                 "--timeout=10"
+                 ])
+            self.assertEqual(sender.wait(), 0)
+            self.assertEqual(receiver.wait(), 0)
+            expect = [long(i) for i in range(991, 1001)]
+            sn = lambda m: m.properties["sn"]
+            brokers[1].assert_browse_backup("q", expect, transform=sn)
+            brokers[2].assert_browse_backup("q", expect, transform=sn)
+        finally: l.restore()
 
     def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in python client"""
-        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
-        primary.promote()
-        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        # Check that backup rejects normal connections
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
         try:
-            backup.connect().session()
-            self.fail("Expected connection to backup to fail")
-        except ConnectionError: pass
-        # Check that admin connections are allowed to backup.
-        backup.connect_admin().close()
-
-        # Test discovery: should connect to primary after reject by backup
-        c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
-        s = c.session()
-        sender = s.sender("q;{create:always}")
-        backup.wait_backup("q")
-        sender.send("foo")
-        primary.kill()
-        assert retry(lambda: not is_running(primary.pid))
-        backup.promote()
-        sender.send("bar")
-        self.assert_browse_retry(s, "q", ["foo", "bar"])
-        c.close()
+            primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+            primary.promote()
+            backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+            # Check that backup rejects normal connections
+            try:
+                backup.connect().session()
+                self.fail("Expected connection to backup to fail")
+            except ConnectionError: pass
+            # Check that admin connections are allowed to backup.
+            backup.connect_admin().close()
+
+            # Test discovery: should connect to primary after reject by backup
+            c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+            s = c.session()
+            sender = s.sender("q;{create:always}")
+            backup.wait_backup("q")
+            sender.send("foo")
+            primary.kill()
+            assert retry(lambda: not is_running(primary.pid))
+            backup.promote()
+            sender.send("bar")
+            self.assert_browse_retry(s, "q", ["foo", "bar"])
+            c.close()
+        finally: l.restore()
 
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
@@ -244,51 +253,57 @@ class ReplicationTests(BrokerTest):
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
-        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-        primary = HaBroker(self, name="primary", ha_cluster=False)
-        pc = primary.connect()
-        ps = pc.session().sender("q;{create:always}")
-        pr = pc.session().receiver("q;{create:always}")
-        backup = HaBroker(self, name="backup", ha_cluster=False)
-        br = backup.connect().session().receiver("q;{create:always}")
-
-        # Set up replication with qpid-ha
-        backup.replicate(primary.host_port(), "q")
-        ps.send("a")
-        backup.assert_browse_backup("q", ["a"])
-        ps.send("b")
-        backup.assert_browse_backup("q", ["a", "b"])
-        self.assertEqual("a", pr.fetch().content)
-        pr.session.acknowledge()
-        backup.assert_browse_backup("q", ["b"])
-
-        # Set up replication with qpid-config
-        ps2 = pc.session().sender("q2;{create:always}")
-        backup.config_replicate(primary.host_port(), "q2");
-        ps2.send("x")
-        backup.assert_browse_backup("q2", ["x"])
-
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            primary = HaBroker(self, name="primary", ha_cluster=False)
+            pc = primary.connect()
+            ps = pc.session().sender("q;{create:always}")
+            pr = pc.session().receiver("q;{create:always}")
+            backup = HaBroker(self, name="backup", ha_cluster=False)
+            br = backup.connect().session().receiver("q;{create:always}")
+
+            # Set up replication with qpid-ha
+            backup.replicate(primary.host_port(), "q")
+            ps.send("a")
+            backup.assert_browse_backup("q", ["a"])
+            ps.send("b")
+            backup.assert_browse_backup("q", ["a", "b"])
+            self.assertEqual("a", pr.fetch().content)
+            pr.session.acknowledge()
+            backup.assert_browse_backup("q", ["b"])
+
+            # Set up replication with qpid-config
+            ps2 = pc.session().sender("q2;{create:always}")
+            backup.config_replicate(primary.host_port(), "q2");
+            ps2.send("x")
+            backup.assert_browse_backup("q2", ["x"])
+        finally: l.restore()
 
     def test_queue_replica_failover(self):
         """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
-        cluster = HaCluster(self, 2)
-        primary = cluster[0]
-        pc = cluster.connect(0)
-        ps = pc.session().sender("q;{create:always}")
-        pr = pc.session().receiver("q;{create:always}")
-        backup = HaBroker(self, name="backup", ha_cluster=False)
-        br = backup.connect().session().receiver("q;{create:always}")
-        backup.replicate(cluster.url, "q")
-        ps.send("a")
-        backup.assert_browse_backup("q", ["a"])
-        cluster.bounce(0)
-        backup.assert_browse_backup("q", ["a"])
-        ps.send("b")
-        backup.assert_browse_backup("q", ["a", "b"])
-        cluster.bounce(1)
-        self.assertEqual("a", pr.fetch().content)
-        pr.session.acknowledge()
-        backup.assert_browse_backup("q", ["b"])
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            cluster = HaCluster(self, 2)
+            primary = cluster[0]
+            pc = cluster.connect(0)
+            ps = pc.session().sender("q;{create:always}")
+            pr = pc.session().receiver("q;{create:always}")
+            backup = HaBroker(self, name="backup", ha_cluster=False)
+            br = backup.connect().session().receiver("q;{create:always}")
+            backup.replicate(cluster.url, "q")
+            ps.send("a")
+            backup.assert_browse_backup("q", ["a"])
+            cluster.bounce(0)
+            backup.assert_browse_backup("q", ["a"])
+            ps.send("b")
+            backup.assert_browse_backup("q", ["a", "b"])
+            cluster.bounce(1)
+            self.assertEqual("a", pr.fetch().content)
+            pr.session.acknowledge()
+            backup.assert_browse_backup("q", ["b"])
+            pc.close()
+            br.close()
+        finally: l.restore()
 
     def test_lvq(self):
         """Verify that we replicate to an LVQ correctly"""
@@ -733,56 +748,59 @@ class RecoveryTests(BrokerTest):
         """Verify that the broker holds queues without sufficient backup,
         i.e. does not complete messages sent to those queues."""
 
-        # We don't want backups to time out for this test, set long timeout.
-        cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
-        # Wait for the primary to be ready
-        cluster[0].wait_status("active")
-        # Create a queue before the failure.
-        s1 = cluster.connect(0).session().sender("q1;{create:always}")
-        for b in cluster: b.wait_backup("q1")
-        for i in xrange(100): s1.send(str(i))
-        # Kill primary and 2 backups
-        for i in [0,1,2]: cluster.kill(i, False)
-        cluster[3].promote()    # New primary, backups will be 1 and 2
-        cluster[3].wait_status("recovering")
-
-        def assertSyncTimeout(s):
-            try:
-                s.sync(timeout=.01)
-                self.fail("Expected Timeout exception")
-            except Timeout: pass
-
-        # Create a queue after the failure
-        s2 = cluster.connect(3).session().sender("q2;{create:always}")
-
-        # Verify that messages sent are not completed
-        for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
-        assertSyncTimeout(s1)
-        self.assertEqual(s1.unsettled(), 100)
-        assertSyncTimeout(s2)
-        self.assertEqual(s2.unsettled(), 100)
-
-        # Verify we can receive even if sending is on hold:
-        cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
-
-        # Restart backups, verify queues are released only when both backups are up
-        cluster.restart(1)
-        assertSyncTimeout(s1)
-        self.assertEqual(s1.unsettled(), 100)
-        assertSyncTimeout(s2)
-        self.assertEqual(s2.unsettled(), 100)
-        self.assertEqual(cluster[3].ha_status(), "recovering")
-        cluster.restart(2)
-
-        # Verify everything is up to date and active
-        def settled(sender): sender.sync(); return sender.unsettled() == 0;
-        assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
-        assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
-        cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
-        cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
-        cluster[3].wait_status("active"),
-        s1.session.connection.close()
-        s2.session.connection.close()
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            # We don't want backups to time out for this test, set long timeout.
+            cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+            # Wait for the primary to be ready
+            cluster[0].wait_status("active")
+            # Create a queue before the failure.
+            s1 = cluster.connect(0).session().sender("q1;{create:always}")
+            for b in cluster: b.wait_backup("q1")
+            for i in xrange(100): s1.send(str(i))
+            # Kill primary and 2 backups
+            for i in [0,1,2]: cluster.kill(i, False)
+            cluster[3].promote()    # New primary, backups will be 1 and 2
+            cluster[3].wait_status("recovering")
+
+            def assertSyncTimeout(s):
+                try:
+                    s.sync(timeout=.01)
+                    self.fail("Expected Timeout exception")
+                except Timeout: pass
+
+            # Create a queue after the failure
+            s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+            # Verify that messages sent are not completed
+            for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+            assertSyncTimeout(s1)
+            self.assertEqual(s1.unsettled(), 100)
+            assertSyncTimeout(s2)
+            self.assertEqual(s2.unsettled(), 100)
+
+            # Verify we can receive even if sending is on hold:
+            cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+            # Restart backups, verify queues are released only when both backups are up
+            cluster.restart(1)
+            assertSyncTimeout(s1)
+            self.assertEqual(s1.unsettled(), 100)
+            assertSyncTimeout(s2)
+            self.assertEqual(s2.unsettled(), 100)
+            self.assertEqual(cluster[3].ha_status(), "recovering")
+            cluster.restart(2)
+
+            # Verify everything is up to date and active
+            def settled(sender): sender.sync(); return sender.unsettled() == 0;
+            assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+            assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+            cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+            cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+            cluster[3].wait_status("active"),
+            s1.session.connection.close()
+            s2.session.connection.close()
+        finally: l.restore()
 
     def test_expected_backup_timeout(self):
         """Verify that we time-out expected backups and release held queues



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org