You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC
svn commit: r926686 [6/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/
cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/
cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/
cpp/include/qpid/client/amqp0_10/ cpp/incl...
Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Mar 23 18:00:49 2010
@@ -77,7 +77,7 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name supplied in the address " +
+ assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
@@ -140,7 +140,7 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name supplied in the address " +
+ assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
@@ -150,7 +150,7 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name supplied in the address " +
+ assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
@@ -167,7 +167,7 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name supplied in the address " +
+ assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
assertFalse("Queue should not be created",(
Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 23 18:00:49 2010
@@ -334,6 +334,7 @@ public class DurableSubscriptionTest ext
{
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull(msg);
assertEquals("B", ((TextMessage) msg).getText());
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java Tue Mar 23 18:00:49 2010
@@ -339,7 +339,7 @@ public class FaultTest extends AbstractX
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO, e.errorCode);
}
- }
+ }
/**
* Strategy:
@@ -355,7 +355,7 @@ public class FaultTest extends AbstractX
_xaResource.end(xid, XAResource.TMSUCCESS);
xid = getNewXid();
_xaResource.start(xid, XAResource.TMNOFLAGS);
- assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0);
+ assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 1000);
}
/**
@@ -381,5 +381,29 @@ public class FaultTest extends AbstractX
assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT, e.errorCode);
}
}
+
+ /**
+ * Strategy:
+ * Set the transaction timeout to 1000
+ */
+ public void testTransactionTimeoutAfterCommit() throws Exception
+ {
+ Xid xid = getNewXid();
+
+ _xaResource.start(xid, XAResource.TMNOFLAGS);
+ _xaResource.setTransactionTimeout(1000);
+ assertEquals("Wrong timeout", 1000,_xaResource.getTransactionTimeout());
+
+ //_xaResource.prepare(xid);
+ _xaResource.end(xid, XAResource.TMSUCCESS);
+ _xaResource.commit(xid, true);
+
+ _xaResource.setTransactionTimeout(2000);
+ assertEquals("Wrong timeout", 2000,_xaResource.getTransactionTimeout());
+
+ xid = getNewXid();
+ _xaResource.start(xid, XAResource.TMNOFLAGS);
+ assertEquals("Wrong timeout", 2000, _xaResource.getTransactionTimeout());
+ }
}
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes Tue Mar 23 18:00:49 2010
@@ -56,10 +56,13 @@ org.apache.qpid.test.client.timeouts.Syn
// QPID-1262, QPID-1119 : This test fails occasionally due to potential protocol issue.
org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
-// c++ broker doesn't support priorities, TTL or message bouncing
+// c++ broker doesn't support priorities, message bouncing
org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
org.apache.qpid.server.queue.PriorityTest#*
+
+// c++ broker expires messages on delivery or when the queue cleaner thread runs.
org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL
+org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTLwithDurableSubscription
// QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache
org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#*
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/Excludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaStandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaTransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/XAExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/clean-dir:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir:805429-821809
+/qpid/trunk/qpid/java/test-profiles/clean-dir:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test_resources/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/test_resources:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/test_resources:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/test_resources:805429-821809
+/qpid/trunk/qpid/java/test-profiles/test_resources:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
/qpid/branches/0.5.x-dev/qpid/python:892761,894875
/qpid/branches/java-network-refactor/qpid/python:805429-825319
/qpid/branches/qmfv2/qpid/python:902858,902894
+/qpid/trunk/qpid/python:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/python/examples/api/spout
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
/qpid/branches/qmfv2/qpid/python/examples/api/spout:902858,902894
/qpid/branches/qpid.rnr/python/examples/api/spout:894071-896158
+/qpid/trunk/qpid/python/examples/api/spout:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in Tue Mar 23 18:00:49 2010
@@ -1,5 +1,18 @@
==== topic_publisher.py.out
==== topic_subscriber.py.out | remove_uuid | sort
+Messages on 'europe' queue:
+Messages on 'news' queue:
+Messages on 'usa' queue:
+Messages on 'weather' queue:
+Queues created - please start the topic producer
+Subscribing local queue 'local_europe' to europe-'
+Subscribing local queue 'local_news' to news-'
+Subscribing local queue 'local_usa' to usa-'
+Subscribing local queue 'local_weather' to weather-'
+That's all, folks!
+That's all, folks!
+That's all, folks!
+That's all, folks!
europe.news 0
europe.news 0
europe.news 1
@@ -20,19 +33,6 @@ europe.weather 3
europe.weather 3
europe.weather 4
europe.weather 4
-Messages on 'europe' queue:
-Messages on 'news' queue:
-Messages on 'usa' queue:
-Messages on 'weather' queue:
-Queues created - please start the topic producer
-Subscribing local queue 'local_europe' to europe-'
-Subscribing local queue 'local_news' to news-'
-Subscribing local queue 'local_usa' to usa-'
-Subscribing local queue 'local_weather' to weather-'
-That's all, folks!
-That's all, folks!
-That's all, folks!
-That's all, folks!
usa.news 0
usa.news 0
usa.news 1
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py Tue Mar 23 18:00:49 2010
@@ -38,11 +38,10 @@ EXPECT_EXIT_FAIL=2 # Expect to e
EXPECT_RUNNING=3 # Expect to still be running at end of test
EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
-def is_exe(fpath):
- return os.path.exists(fpath) and os.access(fpath, os.X_OK)
-
def find_exe(program):
"""Find an executable in the system PATH"""
+ def is_exe(fpath):
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
dir, name = os.path.split(program)
if dir:
if is_exe(program): return program
@@ -144,13 +143,13 @@ class Popen(popen2.Popen3):
expect - if set verify expectation at end of test.
drain - if true (default) drain stdout/stderr to files.
"""
- assert find_exe(cmd[0])
+ assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
popen2.Popen3.__init__(self, self.cmd, True)
self.expect = expect
self.was_shutdown = False # Set if we deliberately kill/terminate the process
- self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
+ self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
@@ -179,6 +178,7 @@ class Popen(popen2.Popen3):
def stop(self): # Clean up at end of test.
self.drain()
+ self.stdin.close()
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
except: pass
@@ -267,8 +267,9 @@ class Broker(Popen):
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
- self._host = "localhost"
+ self._host = "127.0.0.1"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._log_ready = False
def host(self): return self._host
@@ -302,8 +303,8 @@ class Broker(Popen):
c.close()
def _prep_sender(self, queue, durable, xprops):
- s = queue + "; {create:always, node-properties:{durable:" + str(durable)
- if xprops != None: s += ", x-properties:{" + xprops + "}"
+ s = queue + "; {create:always, node:{durable:" + str(durable)
+ if xprops != None: s += ", x-declare:{" + xprops + "}"
return s + "}}"
def send_message(self, queue, message, durable=True, xprops=None, session=None):
@@ -344,16 +345,17 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
+ if self._log_ready: return True
if not os.path.exists(self.log): return False
- ready_msg = re.compile("notice Broker running")
f = file(self.log)
try:
for l in f:
- if ready_msg.search(l): return True
+ if "notice Broker running" in l:
+ self._log_ready = True
+ return True
return False
finally: f.close()
- # FIXME aconway 2010-03-02: rename to wait_ready
def ready(self):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
@@ -361,7 +363,7 @@ class Broker(Popen):
raise Exception("Timed out waiting for broker %s" % self.name)
# Make a connection, this will wait for extended cluster init to finish.
try: self.connect().close()
- except: raise RethrownException("Broker %s failed ready test %s"%self.name)
+ except: raise RethrownException("Broker %s failed ready test"%self.name)
class Cluster:
"""A cluster of brokers in a test."""
@@ -427,6 +429,7 @@ class BrokerTest(TestCase):
for p in self.stopem:
try: p.stop()
except Exception, e: err.append(str(e))
+
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
def cleanup_stop(self, stopable):
@@ -446,7 +449,7 @@ class BrokerTest(TestCase):
if (wait):
try: b.ready()
except Exception, e:
- raise Exception("Failed to start broker %s: %s" % ( b.name, e))
+ raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
Propchange: qpid/branches/qmf-devel0.7a/qpid/python/qpid/concurrency.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
/qpid/branches/qmfv2/qpid/python/qpid/concurrency.py:902858,902894
/qpid/branches/qpid.rnr/python/qpid/concurrency.py:894071-896158
+/qpid/trunk/qpid/python/qpid/concurrency.py:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py Tue Mar 23 18:00:49 2010
@@ -17,11 +17,16 @@
# under the License.
#
+__SELF__ = object()
+
class Constant:
- def __init__(self, name, value=None):
+ def __init__(self, name, value=__SELF__):
self.name = name
- self.value = value
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
def __repr__(self):
return self.name
@@ -30,3 +35,6 @@ AMQP_PORT = 5672
AMQPS_PORT = 5671
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py Tue Mar 23 18:00:49 2010
@@ -18,7 +18,7 @@
#
import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
@@ -27,13 +27,13 @@ from qpid.exceptions import Timeout, Ver
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
-from qpid.validator import And, Context, Map, Types, Values
+from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
log = getLogger("qpid.messaging")
@@ -78,9 +78,8 @@ class Pattern:
sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#")))
-FILTER_DEFAULTS = {
- "topic": Pattern("*"),
- "amq.failover": Pattern("DUMMY")
+SUBJECT_DEFAULTS = {
+ "topic": "#"
}
# XXX
@@ -130,7 +129,14 @@ class SessionState:
id = self.sent
self.write_cmd(query, lambda: handler(self.results.pop(id)))
- def write_cmd(self, cmd, action=noop):
+ def apply_overrides(self, cmd, overrides):
+ for k, v in overrides.items():
+ cmd[k.replace('-', '_')] = v
+
+ def write_cmd(self, cmd, action=noop, overrides=None):
+ if overrides:
+ self.apply_overrides(cmd, overrides)
+
if action != noop:
cmd.sync = True
if self.detached:
@@ -154,28 +160,36 @@ class SessionState:
self.driver.write_op(op)
POLICIES = Values("always", "sender", "receiver", "never")
+RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
+ "exactly-once")
-class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
+DECLARE = Map({}, restricted=False)
+BINDINGS = List(Map({
+ "exchange": Types(basestring),
+ "queue": Types(basestring),
+ "key": Types(basestring),
+ "arguments": Map({}, restricted=False)
+ }))
COMMON_OPTS = {
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- }
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS
+ }),
+ "link": Map({
+ "name": Types(basestring),
+ "durable": Types(bool),
+ "reliability": RELIABILITY,
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS,
+ "x-subscribe": Map({}, restricted=False)
+ })
+ }
RECEIVE_MODES = Values("browse", "consume")
@@ -196,36 +210,46 @@ class LinkIn:
_rcv.destination = str(rcv.id)
sst.destinations[_rcv.destination] = _rcv
_rcv.draining = False
+ _rcv.on_unlink = []
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ link_opts = _rcv.options.get("link", {})
+ # XXX: default?
+ reliability = link_opts.get("reliability", "unreliable")
+ declare = link_opts.get("x-declare", {})
+ subscribe = link_opts.get("x-subscribe", {})
acq_mode = acquire_mode.pre_acquired
if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
- filter = _rcv.options.get("filter")
- if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[subtype]
- elif _rcv.subject and filter:
- # XXX
- raise Exception("can't supply both subject and filter")
- elif _rcv.subject:
- # XXX
- f = Pattern(_rcv.subject)
- else:
- f = filter
- f._bind(sst, _rcv.name, _rcv._queue)
+ default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
+ _rcv._queue = link_opts.get("name", default_name)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue,
+ durable=link_opts.get("durable", False),
+ exclusive=True,
+ auto_delete=(reliability == "unreliable")),
+ overrides=declare)
+ _rcv.on_unlink = [QueueDelete(_rcv._queue)]
+ subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
+ sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
+ bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
elif type == "queue":
_rcv._queue = _rcv.name
if _rcv.options.get("mode", "consume") == "browse":
acq_mode = acquire_mode.not_acquired
+ bindings = get_bindings(link_opts, queue=_rcv._queue)
+ sst.write_cmds(bindings)
sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
- acquire_mode = acq_mode))
+ acquire_mode = acq_mode),
+ overrides=subscribe)
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
- sst.write_cmd(MessageCancel(_rcv.destination), action)
+ link_opts = _rcv.options.get("link", {})
+ reliability = link_opts.get("reliability")
+ cmds = [MessageCancel(_rcv.destination)]
+ cmds.extend(_rcv.on_unlink)
+ sst.write_cmds(cmds, action)
def del_link(self, sst, rcv, _rcv):
del sst.destinations[_rcv.destination]
@@ -240,13 +264,16 @@ class LinkOut:
_snd.closing = False
def do_link(self, sst, snd, _snd, type, subtype, action):
+ link_opts = _snd.options.get("link", {})
if type == "topic":
_snd._exchange = _snd.name
_snd._routing_key = _snd.subject
+ bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
- action()
+ bindings = get_bindings(link_opts, queue=_snd.name)
+ sst.write_cmds(bindings, action)
def do_unlink(self, sst, snd, _snd, action=noop):
action()
@@ -435,6 +462,19 @@ class Driver:
self._host = (self._host + 1) % len(self._hosts)
self.close_engine(e)
+DEFAULT_DISPOSITION = Disposition(None)
+
+def get_bindings(opts, queue=None, exchange=None, key=None):
+ bindings = opts.get("x-bindings", [])
+ cmds = []
+ for b in bindings:
+ exchange = b.get("exchange", exchange)
+ queue = b.get("queue", queue)
+ key = b.get("key", key)
+ args = b.get("arguments", {})
+ cmds.append(ExchangeBind(queue, exchange, key, args))
+ return cmds
+
class Engine:
def __init__(self, connection):
@@ -783,12 +823,6 @@ class Engine:
err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
- elif type == "queue":
- try:
- cmds = self.bindings(lnk)
- sst.write_cmds(cmds, lambda: action(type, subtype))
- except address.ParseError, e:
- err = (e,)
else:
action(type, subtype)
@@ -829,23 +863,21 @@ class Engine:
def declare(self, sst, lnk, action):
name = lnk.name
- props = lnk.options.get("node-properties", {})
+ props = lnk.options.get("node", {})
durable = props.get("durable", DURABLE_DEFAULT)
type = props.get("type", "queue")
- xprops = props.get("x-properties", {})
+ declare = props.get("x-declare", {})
if type == "topic":
cmd = ExchangeDeclare(exchange=name, durable=durable)
+ bindings = get_bindings(props, exchange=name)
elif type == "queue":
cmd = QueueDeclare(queue=name, durable=durable)
+ bindings = get_bindings(props, queue=name)
else:
raise ValueError(type)
- for f in cmd.FIELDS:
- if f.name != "arguments" and xprops.has_key(f.name):
- cmd[f.name] = xprops.pop(f.name)
- if xprops:
- cmd.arguments = xprops
+ sst.apply_overrides(cmd, declare)
if type == "topic":
if cmd.type is None:
@@ -855,11 +887,7 @@ class Engine:
subtype = None
cmds = [cmd]
- if type == "queue":
- try:
- cmds.extend(self.bindings(lnk))
- except address.ParseError, e:
- return (e,)
+ cmds.extend(bindings)
def declared():
self.address_cache[name] = (type, subtype)
@@ -867,16 +895,6 @@ class Engine:
sst.write_cmds(cmds, declared)
- def bindings(self, lnk):
- props = lnk.options.get("node-properties", {})
- xprops = props.get("x-properties", {})
- bindings = xprops.get("bindings", [])
- cmds = []
- for b in bindings:
- n, s, o = address.parse(b)
- cmds.append(ExchangeBind(lnk.name, n, s, o))
- return cmds
-
def delete(self, sst, name, action):
def deleted():
del self.address_cache[name]
@@ -915,19 +933,49 @@ class Engine:
if ssn.acked:
messages = [m for m in ssn.acked if m not in sst.acked]
if messages:
- # XXX: we're ignoring acks that get lost when disconnected,
- # could we deal this via some message-id based purge?
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ ids = RangedSet()
+
+ disposed = [(DEFAULT_DISPOSITION, [])]
+ for m in messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ if m._transfer_id is None:
+ continue
+ ids.add(m._transfer_id)
+ disp = m._disposition or DEFAULT_DISPOSITION
+ last, msgs = disposed[-1]
+ if disp.type is last.type and disp.options == last.options:
+ msgs.append(m)
+ else:
+ disposed.append((disp, [m]))
+
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- def ack_ack():
- for m in messages:
- ssn.acked.remove(m)
- if not ssn.transactional:
- sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids), ack_ack)
- log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+ def ack_acker(msgs):
+ def ack_ack():
+ for m in msgs:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ return ack_ack
+
+ for disp, msgs in disposed:
+ if not msgs: continue
+ if disp.type is None:
+ op = MessageAccept
+ elif disp.type is RELEASED:
+ op = MessageRelease
+ elif disp.type is REJECTED:
+ op = MessageReject
+ sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+ **disp.options),
+ ack_acker(msgs))
+ if log.isEnabledFor(DEBUG):
+ for m in msgs:
+ log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -948,7 +996,7 @@ class Engine:
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(MessageRelease(ids, True))
sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
@@ -1055,8 +1103,11 @@ class Engine:
if mp.application_headers is None:
mp.application_headers = {}
mp.application_headers[TO] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
+ if msg.durable is not None:
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ else:
+ dp.delivery_mode = delivery_mode.non_persistent
if msg.priority is not None:
dp.priority = msg.priority
if msg.ttl is not None:
@@ -1109,7 +1160,8 @@ class Engine:
if mp.reply_to is not None:
msg.reply_to = reply_to2addr(mp.reply_to)
msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
+ if dp.delivery_mode is not None:
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.priority = dp.priority
msg.ttl = dp.ttl
msg.redelivered = dp.redelivered
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 18:00:49 2010
@@ -295,14 +295,29 @@ class Session:
create: <create-policy>,
delete: <delete-policy>,
assert: <assert-policy>,
- node-properties: {
+ node: {
type: <node-type>,
durable: <node-durability>,
- x-properties: {
- bindings: ["<exchange>/<key>", ...],
- <passthrough-key>: <passthrough-value>
- }
+ x-declare: { ... <queue-declare overrides> ... }
+ x-bindings: [<binding_1>, ... <binding_n>]
}
+ link: {
+ name: <link-name>,
+ durable: <link-durability>,
+ reliability: <link-reliability>,
+ x-declare: { ... <queue-declare overrides> ... }
+ x-bindings: [<binding_1>, ... <binding_n>]
+ x-subscribe: { ... <message-subscribe overrides> ... }
+ }
+ }
+
+ Bindings are specified as a map with the following options::
+
+ {
+ exchange: <exchange>,
+ queue: <queue>,
+ key: <key>,
+ arguments: <arguments>
}
The create, delete, and assert policies specify who should perfom
@@ -316,14 +331,12 @@ class Session:
The node-type is one of:
- I{topic}: a topic node will default to the topic exchange,
- x-properties may be used to specify other exchange types
+ x-declare may be used to specify other exchange types
- I{queue}: this is the default node-type
- The x-properties map permits arbitrary additional keys and values to
- be specified. These keys and values are passed through when creating
- a node or asserting facts about an existing node. Any passthrough
- keys and values that do not match a standard field of the underlying
- exchange or queue declare command will be sent in the arguments map.
+ The x-declare map permits protocol specific keys and values to be
+ specified. These keys and values are passed through when creating a
+ node or asserting facts about an existing node.
Examples
--------
@@ -353,18 +366,18 @@ class Session:
You can customize the properties of the queue::
- my-queue; {create: always, node-properties: {durable: True}}
+ my-queue; {create: always, node: {durable: True}}
You can create a topic instead if you want::
- my-queue; {create: always, node-properties: {type: topic}}
+ my-queue; {create: always, node: {type: topic}}
You can assert that the address resolves to a node with particular
properties::
my-transient-topic; {
assert: always,
- node-properties: {
+ node: {
type: topic,
durable: False
}
@@ -508,7 +521,7 @@ class Session:
raise Empty
@synchronized
- def acknowledge(self, message=None, sync=True):
+ def acknowledge(self, message=None, disposition=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@@ -530,6 +543,7 @@ class Session:
raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
self._wakeup()
self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ m._disposition = disposition
self.unacked.remove(m)
self.acked.append(m)
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py Tue Mar 23 18:00:49 2010
@@ -129,7 +129,7 @@ class Message:
"correlation_id", "priority", "ttl"]:
value = self.__dict__[name]
if value is not None: args.append("%s=%r" % (name, value))
- for name in ["durable", "properties"]:
+ for name in ["durable", "redelivered", "properties"]:
value = self.__dict__[name]
if value: args.append("%s=%r" % (name, value))
if self.content_type != get_type(self.content):
@@ -141,4 +141,15 @@ class Message:
args.append(repr(self.content))
return "Message(%s)" % ", ".join(args)
-__all__ = ["Message"]
+class Disposition:
+
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + \
+ ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+__all__ = ["Message", "Disposition"]
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py Tue Mar 23 18:00:49 2010
@@ -59,6 +59,9 @@ class Base(Test):
else:
return "%s[%s, %s]" % (base, count, self.test_id)
+ def message(self, base, count = None, **kwargs):
+ return Message(content=self.content(base, count), **kwargs)
+
def ping(self, ssn):
PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
@@ -70,16 +73,52 @@ class Base(Test):
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None, timeout=0, expected=None):
- contents = []
+ def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
+ messages = []
try:
- while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(timeout=timeout).content)
+ while limit is None or len(messages) < limit:
+ messages.append(rcv.fetch(timeout=timeout))
except Empty:
pass
if expected is not None:
- assert expected == contents, "expected %s, got %s" % (expected, contents)
- return contents
+ self.assertEchos(expected, messages, redelivered)
+ return messages
+
+ def diff(self, m1, m2):
+ result = {}
+ for attr in ("id", "subject", "user_id", "to", "reply_to",
+ "correlation_id", "durable", "priority", "ttl",
+ "redelivered", "properties", "content_type",
+ "content"):
+ a1 = getattr(m1, attr)
+ a2 = getattr(m2, attr)
+ if a1 != a2:
+ result[attr] = (a1, a2)
+ return result
+
+ def assertEcho(self, msg, echo, redelivered=False):
+ if not isinstance(msg, Message) or not isinstance(echo, Message):
+ if isinstance(msg, Message):
+ msg = msg.content
+ if isinstance(echo, Message):
+ echo = echo.content
+ assert msg == echo, "expected %s, got %s" % (msg, echo)
+ else:
+ delta = self.diff(msg, echo)
+ mttl, ettl = delta.pop("ttl", (0, 0))
+ if redelivered:
+ assert echo.redelivered, \
+ "expected %s to be redelivered: %s" % (msg, echo)
+ if delta.has_key("redelivered"):
+ del delta["redelivered"]
+ assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
+ assert mttl >= ettl, "%s, %s" % (mttl, ettl)
+ assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
+
+ def assertEchos(self, msgs, echoes, redelivered=False):
+ assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
+ for m, e in zip(msgs, echoes):
+ self.assertEcho(m, e, redelivered)
def assertEmpty(self, rcv):
contents = self.drain(rcv)
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 18:00:49 2010
@@ -227,21 +227,60 @@ class SessionTests(Base):
def testAcknowledgeAsyncAckCapUNLIMITED(self):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-declare: {
+ alternate-exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
for i in range(count):
- c = self.content(base, i)
+ c = self.message(base, i)
snd.send(c)
- contents.append(c)
+ messages.append(c)
snd.close()
- return contents
+ return messages
def txTest(self, commit):
TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
if commit:
txssn.commit()
self.assertEmpty(rcv)
- assert contents == self.drain(copy_rcv)
+ self.drain(copy_rcv, expected=messages)
else:
txssn.rollback()
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=messages, redelivered=True)
self.assertEmpty(copy_rcv)
self.ssn.acknowledge()
@@ -271,13 +310,13 @@ class SessionTests(Base):
def txTestSend(self, commit):
TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
txssn.commit()
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=messages)
self.ssn.acknowledge()
else:
txssn.rollback()
@@ -297,18 +336,17 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- assert contents == self.drain(txrcv)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
if commit:
txssn.acknowledge()
else:
txssn.rollback()
- drained = self.drain(txrcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.rollback()
- assert contents == self.drain(txrcv)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.commit() # commit without ack
self.assertEmpty(txrcv)
@@ -316,7 +354,7 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
- assert contents == self.drain(txrcv)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.commit()
rcv = self.ssn.receiver(TX_ACK_QD)
@@ -477,7 +515,7 @@ class ReceiverTests(Base):
snd = self.ssn.sender("""test-double-close; {
create: always,
delete: sender,
- node-properties: {
+ node: {
type: topic
}
}
@@ -533,9 +571,9 @@ class AddressTests(Base):
assert "error in options: %s" % error == str(e), e
def testIllegalKey(self):
- self.badOption("{create: always, node-properties: "
+ self.badOption("{create: always, node: "
"{this-property-does-not-exist: 3}}",
- "node-properties: this-property-does-not-exist: "
+ "node: this-property-does-not-exist: "
"illegal key")
def testWrongValue(self):
@@ -543,23 +581,17 @@ class AddressTests(Base):
"('always', 'sender', 'receiver', 'never')")
def testWrongType1(self):
- self.badOption("{node-properties: asdf}",
- "node-properties: asdf is not a map")
+ self.badOption("{node: asdf}",
+ "node: asdf is not a map")
def testWrongType2(self):
- self.badOption("{node-properties: {durable: []}}",
- "node-properties: durable: [] is not a bool")
-
- def testNonQueueBindings(self):
- self.badOption("{node-properties: {type: topic, x-properties: "
- "{bindings: []}}}",
- "node-properties: x-properties: bindings: "
- "bindings are only permitted on nodes of type queue")
+ self.badOption("{node: {durable: []}}",
+ "node: durable: [] is not a bool")
def testCreateQueue(self):
snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node-properties: {type: queue, durable: False, "
- "x-properties: {auto_delete: true}}}")
+ "node: {type: queue, durable: False, "
+ "x-declare: {auto_delete: true}}}")
content = self.content("testCreateQueue")
snd.send(content)
rcv = self.ssn.receiver("test-create-queue")
@@ -569,10 +601,10 @@ class AddressTests(Base):
addr = """test-create-exchange; {
create: always,
delete: always,
- node-properties: {
+ node: {
type: topic,
durable: False,
- x-properties: {auto_delete: true, %s}
+ x-declare: {auto_delete: true, %s}
}
}""" % props
snd = self.ssn.sender(addr)
@@ -639,15 +671,15 @@ class AddressTests(Base):
# XXX: need to figure out close after error
self.conn._remove_session(self.ssn)
- def testBindings(self):
+ def testNodeBindingsQueue(self):
snd = self.ssn.sender("""
-test-bindings-queue; {
+test-node-bindings-queue; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
- }
+ node: {
+ x-bindings: [{exchange: "amq.topic", key: "a.#"},
+ {exchange: "amq.direct", key: "b"},
+ {exchange: "amq.topic", key: "c.*"}]
}
}
""")
@@ -658,49 +690,80 @@ test-bindings-queue; {
snd_a.send("two")
snd_b.send("three")
snd_c.send("four")
- rcv = self.ssn.receiver("test-bindings-queue")
+ rcv = self.ssn.receiver("test-node-bindings-queue")
self.drain(rcv, expected=["one", "two", "three", "four"])
- def testBindingsAdditive(self):
- m1 = self.content("testBindingsAdditive", 1)
- m2 = self.content("testBindingsAdditive", 2)
- m3 = self.content("testBindingsAdditive", 3)
- m4 = self.content("testBindingsAdditive", 4)
-
+ def testNodeBindingsTopic(self):
+ rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+ rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+ rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+ rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
snd = self.ssn.sender("""
-test-bindings-additive-queue; {
+test-node-bindings-topic; {
create: always,
delete: always,
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/a"]
- }
+ node: {
+ type: topic,
+ x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+ {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+ {queue: test-node-bindings-topic-queue-b, key: "b"},
+ {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
}
}
""")
+ m1 = Message("one")
+ m2 = Message(subject="a.foo", content="two")
+ m3 = Message(subject="b", content="three")
+ m4 = Message(subject="c.bar", content="four")
+ snd.send(m1)
+ snd.send(m2)
+ snd.send(m3)
+ snd.send(m4)
+ self.drain(rcv, expected=[m1, m2, m3, m4])
+ self.drain(rcv_a, expected=[m2])
+ self.drain(rcv_b, expected=[m3])
+ self.drain(rcv_c, expected=[m4])
+
+ def testLinkBindings(self):
+ m_a = self.message("testLinkBindings", 1, subject="a")
+ m_b = self.message("testLinkBindings", 2, subject="b")
- snd_a = self.ssn.sender("amq.topic/a")
- snd_b = self.ssn.sender("amq.topic/b")
+ self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+ snd = self.ssn.sender("amq.topic")
- snd_a.send(m1)
- snd_b.send(m2)
+ snd.send(m_a)
+ snd.send(m_b)
+ snd.close()
- rcv = self.ssn.receiver("test-bindings-additive-queue")
- self.drain(rcv, expected=[m1])
+ rcv = self.ssn.receiver("test-link-bindings-queue")
+ self.assertEmpty(rcv)
+
+ snd = self.ssn.sender("""
+amq.topic; {
+ link: {
+ x-bindings: [{queue: test-link-bindings-queue, key: a}]
+ }
+}
+""")
- new_snd = self.ssn.sender("""
-test-bindings-additive-queue; {
- node-properties: {
- x-properties: {
- bindings: ["amq.topic/b"]
- }
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a])
+ rcv.close()
+
+ rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+ link: {
+ x-bindings: [{exchange: "amq.topic", key: b}]
}
}
""")
- new_snd.send(m3)
- snd_b.send(m4)
- self.drain(rcv, expected=[m3, m4])
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a, m_b])
def testSubjectOverride(self):
snd = self.ssn.sender("amq.topic/a")
@@ -726,6 +789,32 @@ test-bindings-additive-queue; {
assert e2.subject == "b", "subject: %s" % e2.subject
self.assertEmpty(rcv)
+ def doReliabilityTest(self, reliability, messages, expected):
+ snd = self.ssn.sender("amq.topic")
+ rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+ for m in messages:
+ snd.send(m)
+ self.conn.disconnect()
+ self.conn.connect()
+ self.drain(rcv, expected=expected)
+
+ def testReliabilityUnreliable(self):
+ msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+ self.doReliabilityTest("unreliable", msgs, [])
+
+ def testReliabilityAtLeastOnce(self):
+ msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+ self.doReliabilityTest("at-least-once", msgs, msgs)
+
+ def testLinkName(self):
+ msgs = [self.message("testLinkName", i) for i in range(3)]
+ snd = self.ssn.sender("amq.topic")
+ trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+ qrcv = self.ssn.receiver("test-link-name")
+ for m in msgs:
+ snd.send(m)
+ self.drain(qrcv, expected=msgs)
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -838,8 +927,7 @@ class SenderTests(Base):
msgs = [self.content("asyncTest", i) for i in range(15)]
for m in msgs:
self.snd.send(m, sync=False)
- drained = self.drain(self.rcv, timeout=self.delay())
- assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.drain(self.rcv, timeout=self.delay(), expected=msgs)
self.ssn.acknowledge()
def testSendAsyncCapacity0(self):
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py Tue Mar 23 18:00:49 2010
@@ -68,20 +68,7 @@ class MessageEchoTests(Base):
def check(self, msg):
self.snd.send(msg)
echo = self.rcv.fetch(0)
-
- assert msg.id == echo.id
- assert msg.subject == echo.subject
- assert msg.user_id == echo.user_id
- assert msg.to == echo.to
- assert msg.reply_to == echo.reply_to
- assert msg.correlation_id == echo.correlation_id
- assert msg.durable == echo.durable
- assert msg.priority == echo.priority
- assert msg.ttl == echo.ttl
- assert msg.properties == echo.properties
- assert msg.content_type == echo.content_type
- assert msg.content == echo.content, "%s, %s" % (msg, echo)
-
+ self.assertEcho(msg, echo)
self.ssn.acknowledge(echo)
def testStringContent(self):
Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py Tue Mar 23 18:00:49 2010
@@ -54,6 +54,20 @@ class Types:
else:
return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types]))
+class List:
+
+ def __init__(self, condition):
+ self.condition = condition
+
+ def validate(self, o, ctx):
+ if not isinstance(o, list):
+ return "%s is not a list" % o
+
+ ctx.push(o)
+ for v in o:
+ err = self.condition.validate(v, ctx)
+ if err: return err
+
class Map:
def __init__(self, map, restricted=True):
Propchange: qpid/branches/qmf-devel0.7a/qpid/ruby/ext/sasl/extconf.rb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/ruby/ext/sasl/extconf.rb:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py Tue Mar 23 18:00:49 2010
@@ -430,6 +430,16 @@ class HeadersExchangeTests(TestHelper):
self.myBasicPublish({"irrelevant":0})
self.assertEmpty(self.q)
+ def testMatchVoidValue(self):
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":None})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"bob"})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
class MiscellaneousErrorsTests(TestHelper):
"""
Propchange: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
/qpid/branches/qmfv2/qpid/python/tests_0-9/queue.py:902858,902894
/qpid/branches/qpid.rnr/python/tests_0-9/queue.py:894071-896158
+/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:919043-926606
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org