You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/06/15 22:15:52 UTC
svn commit: r1136170 [2/2] - in /qpid/trunk/qpid/cpp: include/qpid/framing/
src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/qpid/sys/
src/tests/ xml/
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Jun 15 20:15:51 2011
@@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired();
+ queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, timer);
+ QueueCleaner cleaner(queues, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Wed Jun 15 20:15:51 2011
@@ -54,7 +54,7 @@ def filter_log(log):
'caught up',
'active for links|Passivating links|Activating links',
'info Connection.* connected to', # UpdateClient connection
- 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+ 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jun 15 20:15:51 2011
@@ -18,11 +18,12 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -687,6 +694,25 @@ acl allow all all
self.assert_browse(s1, "q", ["foo"])
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -811,7 +837,7 @@ class LongTests(BrokerTest):
endtime = time.time() + self.duration()
# For long duration, first run is a quarter of the duration.
- runtime = max(5, self.duration() / 4.0)
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -853,7 +879,7 @@ class LongTests(BrokerTest):
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
+ cluster_test_logs.verify_logs()
def test_flowlimit_failover(self):
"""Test fail-over during continuous send-receive with flow control
@@ -880,6 +906,7 @@ class LongTests(BrokerTest):
while time.time() < endtime:
for s in senders: s.sender.assert_running()
receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
@@ -889,6 +916,114 @@ class LongTests(BrokerTest):
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
+ cluster_test_logs.verify_logs()
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ sender.connection.detach()
+ receiver.connection.detach()
+ logger.setLevel(log_level)
class StoreTests(BrokerTest):
"""
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1136170&r1=1136169&r2=1136170&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jun 15 20:15:51 2011
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
--
+-
- http://www.apache.org/licenses/LICENSE-2.0
--
+-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -78,10 +78,6 @@
<field name="left" type="vbin16"/> <!-- packed member-id array -->
</control>
- <control name="message-expired" code="0x12">
- <field name="id" type="uint64"/>
- </control>
-
<domain name="error-type" type="uint8" label="Types of error">
<enum>
<choice name="none" value="0"/>
@@ -89,7 +85,7 @@
<choice name="connection" value="2"/>
</enum>
</domain>
-
+
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
@@ -116,6 +112,11 @@
<field name="message" type="vbin32"/>
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x22">
+ <field name="time" type="uint64"/>
+ </control>
+
</class>
<!-- Controls associated with a specific connection. -->
@@ -149,7 +150,7 @@
<!-- Abort a connection that is sending invalid data. -->
<control name="abort" code="0x4"/>
-
+
<!-- Update controls. Sent to a new broker in joining mode.
A connection is updated as followed:
- send the shadow's management ID in shadow-perpare on the update connection
@@ -183,7 +184,7 @@
<field name="position" type="sequence-no"/>
<field name="tag" type="str8"/>
<field name="id" type="sequence-no"/>
- <field name="acquired" type="bit"/> <!--If not set, message follows. -->
+ <field name="acquired" type="bit"/> <!--If not set, message is on update queue. -->
<field name="accepted" type="bit"/>
<field name="cancelled" type="bit"/>
<field name="completed" type="bit"/>
@@ -192,9 +193,9 @@
<field name="enqueued" type="bit"/>
<field name="credit" type="uint32"/>
</control>
-
+
<!-- Tx transaction state. -->
- <control name="tx-start" code="0x12"/>
+ <control name="tx-start" code="0x12"/>
<control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control>
<control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control>
<control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control>
@@ -204,7 +205,7 @@
</control>
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
-
+
<!-- Consumers in the connection's output task -->
<control name="output-task" code="0x19">
<field name="channel" type="uint16"/>
@@ -253,9 +254,6 @@
<!-- Replicate encoded exchanges/queues. -->
<control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
- <!-- Set expiry-id for subsequent messages. -->
- <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
-
<!-- Add a listener to a queue -->
<control name="add-queue-listener" code="0x34">
<field name="queue" type="str8"/>
@@ -289,6 +287,18 @@
<field name="state" type="map"/> <!-- "name"=value -->
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x40">
+ <field name="time" type="uint64"/>
+ </control>
+
+ <!-- Update a queue's dequeue rate -->
+ <control name="queue-dequeue-since-purge-state" code="0x41">
+ <field name="queue" type="str8"/>
+ <field name="dequeueSincePurge" type="uint32"/>
+ </control>
+
+
</class>
</amqp>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org