You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/06/15 22:15:52 UTC

svn commit: r1136170 [2/2] - in /qpid/trunk/qpid/cpp: include/qpid/framing/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/qpid/sys/ src/tests/ xml/

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Jun 15 20:15:51 2011
@@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
     addMessagesToQueue(10, queue);
     BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
     ::usleep(300*1000);
-    queue.purgeExpired();
+    queue.purgeExpired(0);
     BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
 }
 
@@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
-    QueueCleaner cleaner(queues, timer);
+    QueueCleaner cleaner(queues, &timer);
     cleaner.start(100 * qpid::sys::TIME_MSEC);
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Wed Jun 15 20:15:51 2011
@@ -54,7 +54,7 @@ def filter_log(log):
         'caught up',
         'active for links|Passivating links|Activating links',
         'info Connection.* connected to', # UpdateClient connection
-        'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+        'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
         'warning Broker closed connection: 200, OK',
         'task late',
         'task overran',

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jun 15 20:15:51 2011
@@ -18,11 +18,12 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
 from qpid import datatypes, messaging
 from brokertest import *
 from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
 from threading import Thread, Lock, Condition
 from logging import getLogger
 from itertools import chain
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
             destination="amq.direct",
             message=qpid.datatypes.Message(props, "content"))
 
+        # Try message with TTL and differnet headers/properties
+        cluster[0].send_message("q", Message(durable=True, ttl=100000))
+        cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+        cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
         # Now update a new member and compare their dumps.
         cluster.start(args=["--test-store-dump", "updatee.dump"])
         assert readfile("direct.dump") == readfile("updatee.dump")
+
         os.remove("direct.dump")
         os.remove("updatee.dump")
 
@@ -687,6 +694,25 @@ acl allow all all
         self.assert_browse(s1, "q", ["foo"])
 
 
+    def test_ttl_consistent(self):
+        """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+        messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+        messages.append(Message("x"))
+        cluster = self.cluster(2)
+        sender = cluster[0].connect().session().sender("q;{create:always}")
+
+        def fetch(b):
+            receiver = b.connect().session().receiver("q;{create:always}")
+            while receiver.fetch().content != "x": pass
+
+        for m in messages: sender.send(m, sync=False)
+        for m in messages: sender.send(m, sync=False)
+        fetch(cluster[0])
+        fetch(cluster[1])
+        for m in messages: sender.send(m, sync=False)
+        cluster.start()
+        fetch(cluster[2])
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):
@@ -811,7 +837,7 @@ class LongTests(BrokerTest):
 
         endtime = time.time() + self.duration()
         # For long duration, first run is a quarter of the duration.
-        runtime = max(5, self.duration() / 4.0)
+        runtime = min(5.0, self.duration() / 3.0)
         alive = 0                       # First live cluster member
         for i in range(len(cluster)): start_clients(cluster[i])
         start_mclients(cluster[alive])
@@ -853,7 +879,7 @@ class LongTests(BrokerTest):
         end = time.time() + self.duration()
         while (time.time() < end):  # Get a management interval
             for i in xrange(1000): cluster[0].connect().close()
-            cluster_test_logs.verify_logs()
+        cluster_test_logs.verify_logs()
 
     def test_flowlimit_failover(self):
         """Test fail-over during continuous send-receive with flow control
@@ -880,6 +906,7 @@ class LongTests(BrokerTest):
         while time.time() < endtime:
             for s in senders: s.sender.assert_running()
             receiver.receiver.assert_running()
+            for b in cluster[i:]: b.ready() # Check if any broker crashed.
             cluster[i].kill()
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
@@ -889,6 +916,114 @@ class LongTests(BrokerTest):
         receiver.stop()
         for i in range(i, len(cluster)): cluster[i].kill()
 
+    def test_ttl_failover(self):
+        """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+        class Client(StoppableThread):
+
+            def __init__(self, broker):
+                StoppableThread.__init__(self)
+                self.connection = broker.connect(reconnect=True)
+                self.auto_fetch_reconnect_urls(self.connection)
+                self.session = self.connection.session()
+
+            def auto_fetch_reconnect_urls(self, conn):
+                """Replacment for qpid.messaging.util version which is noisy"""
+                ssn = conn.session("auto-fetch-reconnect-urls")
+                rcv = ssn.receiver("amq.failover")
+                rcv.capacity = 10
+
+                def main():
+                    while True:
+                        try:
+                            msg = rcv.fetch()
+                            qpid.messaging.util.set_reconnect_urls(conn, msg)
+                            ssn.acknowledge(msg, sync=False)
+                        except messaging.exceptions.LinkClosed: return
+                        except messaging.exceptions.ConnectionError: return
+
+                thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+                thread.setDaemon(True)
+                thread.start()
+
+            def stop(self):
+                StoppableThread.stop(self)
+                self.connection.detach()
+
+        class Sender(Client):
+            def __init__(self, broker, address):
+                Client.__init__(self, broker)
+                self.sent = 0    # Number of messages _reliably_ sent.
+                self.sender = self.session.sender(address, capacity=1000)
+
+            def send_counted(self, ttl):
+                self.sender.send(Message(str(self.sent), ttl=ttl))
+                self.sent += 1
+
+            def run(self):
+                while not self.stopped:
+                    choice = random.randint(0,4)
+                    if choice == 0: self.send_counted(None) # No ttl
+                    elif choice == 1: self.send_counted(100000) # Large ttl
+                    else: # Small ttl, might expire
+                        self.sender.send(Message("", ttl=random.random()/10))
+                self.sender.send(Message("z"), sync=True) # Chaser.
+
+        class Receiver(Client):
+
+            def __init__(self, broker, address):
+                Client.__init__(self, broker)
+                self.received = 0 # Number of  non-empty (reliable) messages received.
+                self.receiver = self.session.receiver(address, capacity=1000)
+            def run(self):
+                try:
+                    while True:
+                        m = self.receiver.fetch(1)
+                        if m.content == "z": break
+                        if m.content:   # Ignore unreliable messages
+                            # Ignore duplicates
+                            if int(m.content) == self.received: self.received += 1
+                except Exception,e: self.error = e
+
+        # def test_ttl_failover
+
+        # Original cluster will all be killed so expect exit with failure
+        # Set small purge interval.
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+        # Python client failover produces noisy WARN logs, disable temporarily
+        logger = logging.getLogger()
+        log_level = logger.getEffectiveLevel()
+        logger.setLevel(logging.ERROR)
+        try:
+            # Start sender and receiver threads
+            receiver = Receiver(cluster[0], "q;{create:always}")
+            receiver.start()
+            sender = Sender(cluster[0], "q;{create:always}")
+            sender.start()
+
+            # Kill brokers in a cycle.
+            endtime = time.time() + self.duration()
+            runtime = min(5.0, self.duration() / 4.0)
+            i = 0
+            while time.time() < endtime:
+                for b in cluster[i:]: b.ready() # Check if any broker crashed.
+                cluster[i].kill()
+                i += 1
+                b = cluster.start(expect=EXPECT_EXIT_FAIL)
+                b.ready()
+                time.sleep(runtime)
+            sender.stop()
+            receiver.stop()
+            for b in cluster[i:]:
+                b.ready()               # Check it didn't crash
+                b.kill()
+            self.assertEqual(sender.sent, receiver.received)
+            cluster_test_logs.verify_logs()
+        finally:
+            # Detach to avoid slow reconnect attempts during shut-down if test fails.
+            sender.connection.detach()
+            receiver.connection.detach()
+            logger.setLevel(log_level)
 
 class StoreTests(BrokerTest):
     """

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jun 15 20:15:51 2011
@@ -8,9 +8,9 @@
 - to you under the Apache License, Version 2.0 (the
 - "License"); you may not use this file except in compliance
 - with the License.  You may obtain a copy of the License at
-- 
+-
 -   http://www.apache.org/licenses/LICENSE-2.0
-- 
+-
 - Unless required by applicable law or agreed to in writing,
 - software distributed under the License is distributed on an
 - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -78,10 +78,6 @@
       <field name="left" type="vbin16"/> <!-- packed member-id array -->
     </control>
 
-    <control name="message-expired" code="0x12">
-      <field name="id" type="uint64"/>
-    </control>
-    
     <domain name="error-type" type="uint8" label="Types of error">
       <enum>
 	<choice name="none" value="0"/>
@@ -89,7 +85,7 @@
 	<choice name="connection" value="2"/>
       </enum>
     </domain>
-	
+
     <!-- Check for error consistency across the cluster -->
     <control name="error-check" code="0x14">
       <field name="type" type="error-type"/>
@@ -116,6 +112,11 @@
       <field name="message" type="vbin32"/>
     </control>
 
+    <!-- Update the cluster time -->
+    <control name="clock" code="0x22">
+      <field name="time" type="uint64"/>
+    </control>
+
   </class>
 
   <!-- Controls associated with a specific connection. -->
@@ -149,7 +150,7 @@
 
     <!-- Abort a connection that is sending invalid data. -->
     <control name="abort" code="0x4"/>
-    
+
     <!-- Update controls. Sent to a new broker in joining mode.
 	 A connection is updated as followed:
 	 - send the shadow's management ID in shadow-perpare on the update connection
@@ -183,7 +184,7 @@
       <field name="position" type="sequence-no"/>
       <field name="tag" type="str8"/>
       <field name="id" type="sequence-no"/>
-      <field name="acquired" type="bit"/>		       <!--If not set, message follows. -->
+      <field name="acquired" type="bit"/> <!--If not set, message is on update queue. -->
       <field name="accepted" type="bit"/>
       <field name="cancelled" type="bit"/>
       <field name="completed" type="bit"/>
@@ -192,9 +193,9 @@
       <field name="enqueued" type="bit"/>
       <field name="credit" type="uint32"/>
     </control>
-    
+
     <!-- Tx transaction state. -->
-    <control name="tx-start" code="0x12"/> 
+    <control name="tx-start" code="0x12"/>
     <control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control>
     <control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control>
     <control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control>
@@ -204,7 +205,7 @@
     </control>
     <control name="tx-end" code="0x17"/>
     <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
-    
+
     <!-- Consumers in the connection's output task -->
     <control name="output-task" code="0x19">
       <field name="channel" type="uint16"/>
@@ -253,9 +254,6 @@
     <!-- Replicate encoded exchanges/queues. -->
     <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
 
-    <!-- Set expiry-id for subsequent messages. -->
-    <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
-
     <!-- Add a listener to a queue -->
     <control name="add-queue-listener" code="0x34">
       <field name="queue" type="str8"/>
@@ -289,6 +287,18 @@
       <field name="state" type="map"/>    <!-- "name"=value -->
     </control>
 
+    <!-- Update the cluster time -->
+    <control name="clock" code="0x40">
+      <field name="time" type="uint64"/>
+    </control>
+
+    <!-- Update a queue's dequeue rate -->
+    <control name="queue-dequeue-since-purge-state" code="0x41">
+      <field name="queue" type="str8"/>
+      <field name="dequeueSincePurge" type="uint32"/>
+    </control>
+
+
   </class>
 
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org