You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC

svn commit: r926686 [6/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/ cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/ cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/ cpp/include/qpid/client/amqp0_10/ cpp/incl...

Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Mar 23 18:00:49 2010
@@ -77,7 +77,7 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name supplied in the address " +
+            assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
         
@@ -140,7 +140,7 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name supplied in the address " +
+            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
         
@@ -150,7 +150,7 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name supplied in the address " +
+            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
             
@@ -167,7 +167,7 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name supplied in the address " +
+            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
                     "doesn't resolve to an exchange or a queue"));
         }
         assertFalse("Queue should not be created",(

Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 23 18:00:49 2010
@@ -334,6 +334,7 @@ public class DurableSubscriptionTest ext
         {
             _logger.info("Receive message on consumer 3 :expecting B");
             msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+            assertNotNull(msg);
             assertEquals("B", ((TextMessage) msg).getText());
         }
         

Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java Tue Mar 23 18:00:49 2010
@@ -339,7 +339,7 @@ public class FaultTest extends AbstractX
         {
             assertEquals("Wrong error code: ", XAException.XAER_PROTO, e.errorCode);
         }
-    }
+    }    
 
     /**
      * Strategy:
@@ -355,7 +355,7 @@ public class FaultTest extends AbstractX
         _xaResource.end(xid, XAResource.TMSUCCESS);
         xid = getNewXid();
         _xaResource.start(xid, XAResource.TMNOFLAGS);
-        assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0);
+        assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 1000);
     }
 
     /**
@@ -381,5 +381,29 @@ public class FaultTest extends AbstractX
             assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT, e.errorCode);
         }
     }
+    
+    /**
+     * Strategy:
+     * Set the transaction timeout to 1000
+     */
+    public void testTransactionTimeoutAfterCommit() throws Exception
+    {
+        Xid xid = getNewXid();
+        
+        _xaResource.start(xid, XAResource.TMNOFLAGS);
+        _xaResource.setTransactionTimeout(1000);
+        assertEquals("Wrong timeout", 1000,_xaResource.getTransactionTimeout());
+        
+        //_xaResource.prepare(xid);
+        _xaResource.end(xid, XAResource.TMSUCCESS);
+        _xaResource.commit(xid, true);
+        
+        _xaResource.setTransactionTimeout(2000);
+        assertEquals("Wrong timeout", 2000,_xaResource.getTransactionTimeout());
+        
+        xid = getNewXid();
+        _xaResource.start(xid, XAResource.TMNOFLAGS);
+        assertEquals("Wrong timeout", 2000, _xaResource.getTransactionTimeout());
+    }
 
 }

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes Tue Mar 23 18:00:49 2010
@@ -56,10 +56,13 @@ org.apache.qpid.test.client.timeouts.Syn
 // QPID-1262, QPID-1119 : This test fails occasionally due to potential protocol issue.
 org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
 
-// c++ broker doesn't support priorities, TTL or message bouncing
+// c++ broker doesn't support priorities, message bouncing
 org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
 org.apache.qpid.server.queue.PriorityTest#*
+
+// c++ broker expires messages on delivery or when the queue cleaner thread runs.
 org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL
+org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTLwithDurableSubscription
 
 // QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache
 org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#*

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/Excludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaStandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaTransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/XAExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/clean-dir:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir:805429-821809
+/qpid/trunk/qpid/java/test-profiles/clean-dir:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test_resources/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/test_resources:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/test_resources:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/test_resources:805429-821809
+/qpid/trunk/qpid/java/test-profiles/test_resources:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,3 +1,4 @@
 /qpid/branches/0.5.x-dev/qpid/python:892761,894875
 /qpid/branches/java-network-refactor/qpid/python:805429-825319
 /qpid/branches/qmfv2/qpid/python:902858,902894
+/qpid/trunk/qpid/python:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/python/examples/api/spout
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
 /qpid/branches/qmfv2/qpid/python/examples/api/spout:902858,902894
 /qpid/branches/qpid.rnr/python/examples/api/spout:894071-896158
+/qpid/trunk/qpid/python/examples/api/spout:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in Tue Mar 23 18:00:49 2010
@@ -1,5 +1,18 @@
 ==== topic_publisher.py.out
 ==== topic_subscriber.py.out | remove_uuid | sort
+Messages on 'europe' queue:
+Messages on 'news' queue:
+Messages on 'usa' queue:
+Messages on 'weather' queue:
+Queues created - please start the topic producer
+Subscribing local queue 'local_europe' to europe-'
+Subscribing local queue 'local_news' to news-'
+Subscribing local queue 'local_usa' to usa-'
+Subscribing local queue 'local_weather' to weather-'
+That's all, folks!
+That's all, folks!
+That's all, folks!
+That's all, folks!
 europe.news 0
 europe.news 0
 europe.news 1
@@ -20,19 +33,6 @@ europe.weather 3
 europe.weather 3
 europe.weather 4
 europe.weather 4
-Messages on 'europe' queue:
-Messages on 'news' queue:
-Messages on 'usa' queue:
-Messages on 'weather' queue:
-Queues created - please start the topic producer
-Subscribing local queue 'local_europe' to europe-'
-Subscribing local queue 'local_news' to news-'
-Subscribing local queue 'local_usa' to usa-'
-Subscribing local queue 'local_weather' to weather-'
-That's all, folks!
-That's all, folks!
-That's all, folks!
-That's all, folks!
 usa.news 0
 usa.news 0
 usa.news 1

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py Tue Mar 23 18:00:49 2010
@@ -38,11 +38,10 @@ EXPECT_EXIT_FAIL=2         # Expect to e
 EXPECT_RUNNING=3           # Expect to still be running at end of test
 EXPECT_UNKNOWN=4            # No expectation, don't check exit status.
 
-def is_exe(fpath):
-    return os.path.exists(fpath) and os.access(fpath, os.X_OK)
-
 def find_exe(program):
     """Find an executable in the system PATH"""
+    def is_exe(fpath):
+        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
     dir, name = os.path.split(program)
     if dir:
         if is_exe(program): return program
@@ -144,13 +143,13 @@ class Popen(popen2.Popen3):
         expect - if set verify expectation at end of test.
         drain  - if true (default) drain stdout/stderr to files.
         """
-        assert find_exe(cmd[0])
+        assert find_exe(cmd[0]), "executable not found: "+cmd[0]
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
         popen2.Popen3.__init__(self, self.cmd, True)
         self.expect = expect
         self.was_shutdown = False # Set if we deliberately kill/terminate the process
-        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
+        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
         msg = "Process %s" % self.pname
         self.stdin = ExceptionWrapper(self.tochild, msg)
         self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
@@ -179,6 +178,7 @@ class Popen(popen2.Popen3):
     
     def stop(self):                  # Clean up at end of test.
         self.drain()
+        self.stdin.close()
         if self.expect == EXPECT_UNKNOWN:
             try: self.kill()            # Just make sure its dead
             except: pass
@@ -267,8 +267,9 @@ class Broker(Popen):
         cmd += ["--data-dir", self.datadir]
         Popen.__init__(self, cmd, expect, drain=False)
         test.cleanup_stop(self)
-        self._host = "localhost"
+        self._host = "127.0.0.1"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+        self._log_ready = False
 
     def host(self): return self._host
 
@@ -302,8 +303,8 @@ class Broker(Popen):
         c.close()
     
     def _prep_sender(self, queue, durable, xprops):
-        s = queue + "; {create:always, node-properties:{durable:" + str(durable)
-        if xprops != None: s += ", x-properties:{" + xprops + "}"
+        s = queue + "; {create:always, node:{durable:" + str(durable)
+        if xprops != None: s += ", x-declare:{" + xprops + "}"
         return s + "}}"
 
     def send_message(self, queue, message, durable=True, xprops=None, session=None):
@@ -344,16 +345,17 @@ class Broker(Popen):
 
     def log_ready(self):
         """Return true if the log file exists and contains a broker ready message"""
+        if self._log_ready: return True
         if not os.path.exists(self.log): return False
-        ready_msg = re.compile("notice Broker running")
         f = file(self.log)
         try:
             for l in f:
-                if ready_msg.search(l): return True
+                if "notice Broker running" in l:
+                    self._log_ready = True
+                    return True
             return False
         finally: f.close()
 
-    # FIXME aconway 2010-03-02: rename to wait_ready
     def ready(self):
         """Wait till broker is ready to serve clients"""
         # First make sure the broker is listening by checking the log.
@@ -361,7 +363,7 @@ class Broker(Popen):
             raise Exception("Timed out waiting for broker %s" % self.name)
         # Make a connection, this will wait for extended cluster init to finish.
         try: self.connect().close()
-        except: raise RethrownException("Broker %s failed ready test %s"%self.name)
+        except: raise RethrownException("Broker %s failed ready test"%self.name)
 
 class Cluster:
     """A cluster of brokers in a test."""
@@ -427,6 +429,7 @@ class BrokerTest(TestCase):
         for p in self.stopem:
             try: p.stop()
             except Exception, e: err.append(str(e))
+
         if err: raise Exception("Unexpected process status:\n    "+"\n    ".join(err))
 
     def cleanup_stop(self, stopable):
@@ -446,7 +449,7 @@ class BrokerTest(TestCase):
         if (wait):
             try: b.ready()
             except Exception, e:
-                raise Exception("Failed to start broker %s: %s" % ( b.name, e))
+                raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
     def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):

Propchange: qpid/branches/qmf-devel0.7a/qpid/python/qpid/concurrency.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
 /qpid/branches/qmfv2/qpid/python/qpid/concurrency.py:902858,902894
 /qpid/branches/qpid.rnr/python/qpid/concurrency.py:894071-896158
+/qpid/trunk/qpid/python/qpid/concurrency.py:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py Tue Mar 23 18:00:49 2010
@@ -17,11 +17,16 @@
 # under the License.
 #
 
+__SELF__ = object()
+
 class Constant:
 
-  def __init__(self, name, value=None):
+  def __init__(self, name, value=__SELF__):
     self.name = name
-    self.value = value
+    if value is __SELF__:
+      self.value = self
+    else:
+      self.value = value
 
   def __repr__(self):
     return self.name
@@ -30,3 +35,6 @@ AMQP_PORT = 5672
 AMQPS_PORT = 5671
 
 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py Tue Mar 23 18:00:49 2010
@@ -18,7 +18,7 @@
 #
 
 import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
 from qpid import compat
 from qpid import sasl
 from qpid.concurrency import synchronized
@@ -27,13 +27,13 @@ from qpid.exceptions import Timeout, Ver
 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
     FrameDecoder, SegmentDecoder, OpDecoder
 from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
 from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
 from qpid.ops import *
 from qpid.selector import Selector
 from qpid.util import connect
-from qpid.validator import And, Context, Map, Types, Values
+from qpid.validator import And, Context, List, Map, Types, Values
 from threading import Condition, Thread
 
 log = getLogger("qpid.messaging")
@@ -78,9 +78,8 @@ class Pattern:
     sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
                                binding_key=self.value.replace("*", "#")))
 
-FILTER_DEFAULTS = {
-  "topic": Pattern("*"),
-  "amq.failover": Pattern("DUMMY")
+SUBJECT_DEFAULTS = {
+  "topic": "#"
   }
 
 # XXX
@@ -130,7 +129,14 @@ class SessionState:
     id = self.sent
     self.write_cmd(query, lambda: handler(self.results.pop(id)))
 
-  def write_cmd(self, cmd, action=noop):
+  def apply_overrides(self, cmd, overrides):
+    for k, v in overrides.items():
+      cmd[k.replace('-', '_')] = v
+
+  def write_cmd(self, cmd, action=noop, overrides=None):
+    if overrides:
+      self.apply_overrides(cmd, overrides)
+
     if action != noop:
       cmd.sync = True
     if self.detached:
@@ -154,28 +160,36 @@ class SessionState:
     self.driver.write_op(op)
 
 POLICIES = Values("always", "sender", "receiver", "never")
+RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
+                     "exactly-once")
 
-class Bindings:
-
-  def validate(self, o, ctx):
-    t = ctx.containers[1].get("type", "queue")
-    if t != "queue":
-      return "bindings are only permitted on nodes of type queue"
+DECLARE = Map({}, restricted=False)
+BINDINGS = List(Map({
+      "exchange": Types(basestring),
+      "queue": Types(basestring),
+      "key": Types(basestring),
+      "arguments": Map({}, restricted=False)
+      }))
 
 COMMON_OPTS = {
-    "create": POLICIES,
-    "delete": POLICIES,
-    "assert": POLICIES,
-    "node-properties": Map({
-        "type": Values("queue", "topic"),
-        "durable": Types(bool),
-        "x-properties": Map({
-            "type": Types(basestring),
-            "bindings": And(Types(list), Bindings())
-            },
-            restricted=False)
-        })
-    }
+  "create": POLICIES,
+  "delete": POLICIES,
+  "assert": POLICIES,
+  "node": Map({
+      "type": Values("queue", "topic"),
+      "durable": Types(bool),
+      "x-declare": DECLARE,
+      "x-bindings": BINDINGS
+      }),
+  "link": Map({
+      "name": Types(basestring),
+      "durable": Types(bool),
+      "reliability": RELIABILITY,
+      "x-declare": DECLARE,
+      "x-bindings": BINDINGS,
+      "x-subscribe": Map({}, restricted=False)
+      })
+  }
 
 RECEIVE_MODES = Values("browse", "consume")
 
@@ -196,36 +210,46 @@ class LinkIn:
     _rcv.destination = str(rcv.id)
     sst.destinations[_rcv.destination] = _rcv
     _rcv.draining = False
+    _rcv.on_unlink = []
 
   def do_link(self, sst, rcv, _rcv, type, subtype, action):
+    link_opts = _rcv.options.get("link", {})
+    # XXX: default?
+    reliability = link_opts.get("reliability", "unreliable")
+    declare = link_opts.get("x-declare", {})
+    subscribe = link_opts.get("x-subscribe", {})
     acq_mode = acquire_mode.pre_acquired
 
     if type == "topic":
-      _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
-      sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
-      filter = _rcv.options.get("filter")
-      if _rcv.subject is None and filter is None:
-        f = FILTER_DEFAULTS[subtype]
-      elif _rcv.subject and filter:
-        # XXX
-        raise Exception("can't supply both subject and filter")
-      elif _rcv.subject:
-        # XXX
-        f = Pattern(_rcv.subject)
-      else:
-        f = filter
-      f._bind(sst, _rcv.name, _rcv._queue)
+      default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
+      _rcv._queue = link_opts.get("name", default_name)
+      sst.write_cmd(QueueDeclare(queue=_rcv._queue,
+                                 durable=link_opts.get("durable", False),
+                                 exclusive=True,
+                                 auto_delete=(reliability == "unreliable")),
+                    overrides=declare)
+      _rcv.on_unlink = [QueueDelete(_rcv._queue)]
+      subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
+      sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
+      bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
     elif type == "queue":
       _rcv._queue = _rcv.name
       if _rcv.options.get("mode", "consume") == "browse":
         acq_mode = acquire_mode.not_acquired
+      bindings = get_bindings(link_opts, queue=_rcv._queue)
 
+    sst.write_cmds(bindings)
     sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
-                                   acquire_mode = acq_mode))
+                                   acquire_mode = acq_mode),
+                  overrides=subscribe)
     sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
 
   def do_unlink(self, sst, rcv, _rcv, action=noop):
-    sst.write_cmd(MessageCancel(_rcv.destination), action)
+    link_opts = _rcv.options.get("link", {})
+    reliability = link_opts.get("reliability")
+    cmds = [MessageCancel(_rcv.destination)]
+    cmds.extend(_rcv.on_unlink)
+    sst.write_cmds(cmds, action)
 
   def del_link(self, sst, rcv, _rcv):
     del sst.destinations[_rcv.destination]
@@ -240,13 +264,16 @@ class LinkOut:
     _snd.closing = False
 
   def do_link(self, sst, snd, _snd, type, subtype, action):
+    link_opts = _snd.options.get("link", {})
     if type == "topic":
       _snd._exchange = _snd.name
       _snd._routing_key = _snd.subject
+      bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
     elif type == "queue":
       _snd._exchange = ""
       _snd._routing_key = _snd.name
-    action()
+      bindings = get_bindings(link_opts, queue=_snd.name)
+    sst.write_cmds(bindings, action)
 
   def do_unlink(self, sst, snd, _snd, action=noop):
     action()
@@ -435,6 +462,19 @@ class Driver:
       self._host = (self._host + 1) % len(self._hosts)
       self.close_engine(e)
 
+DEFAULT_DISPOSITION = Disposition(None)
+
+def get_bindings(opts, queue=None, exchange=None, key=None):
+  bindings = opts.get("x-bindings", [])
+  cmds = []
+  for b in bindings:
+    exchange = b.get("exchange", exchange)
+    queue = b.get("queue", queue)
+    key = b.get("key", key)
+    args = b.get("arguments", {})
+    cmds.append(ExchangeBind(queue, exchange, key, args))
+  return cmds
+
 class Engine:
 
   def __init__(self, connection):
@@ -783,12 +823,6 @@ class Engine:
           err = self.declare(sst, lnk, action)
         else:
           err = ("no such queue: %s" % lnk.name,)
-      elif type == "queue":
-        try:
-          cmds = self.bindings(lnk)
-          sst.write_cmds(cmds, lambda: action(type, subtype))
-        except address.ParseError, e:
-          err = (e,)
       else:
         action(type, subtype)
 
@@ -829,23 +863,21 @@ class Engine:
 
   def declare(self, sst, lnk, action):
     name = lnk.name
-    props = lnk.options.get("node-properties", {})
+    props = lnk.options.get("node", {})
     durable = props.get("durable", DURABLE_DEFAULT)
     type = props.get("type", "queue")
-    xprops = props.get("x-properties", {})
+    declare = props.get("x-declare", {})
 
     if type == "topic":
       cmd = ExchangeDeclare(exchange=name, durable=durable)
+      bindings = get_bindings(props, exchange=name)
     elif type == "queue":
       cmd = QueueDeclare(queue=name, durable=durable)
+      bindings = get_bindings(props, queue=name)
     else:
       raise ValueError(type)
 
-    for f in cmd.FIELDS:
-      if f.name != "arguments" and xprops.has_key(f.name):
-        cmd[f.name] = xprops.pop(f.name)
-    if xprops:
-      cmd.arguments = xprops
+    sst.apply_overrides(cmd, declare)
 
     if type == "topic":
       if cmd.type is None:
@@ -855,11 +887,7 @@ class Engine:
       subtype = None
 
     cmds = [cmd]
-    if type == "queue":
-      try:
-        cmds.extend(self.bindings(lnk))
-      except address.ParseError, e:
-        return (e,)
+    cmds.extend(bindings)
 
     def declared():
       self.address_cache[name] = (type, subtype)
@@ -867,16 +895,6 @@ class Engine:
 
     sst.write_cmds(cmds, declared)
 
-  def bindings(self, lnk):
-    props = lnk.options.get("node-properties", {})
-    xprops = props.get("x-properties", {})
-    bindings = xprops.get("bindings", [])
-    cmds = []
-    for b in bindings:
-      n, s, o = address.parse(b)
-      cmds.append(ExchangeBind(lnk.name, n, s, o))
-    return cmds
-
   def delete(self, sst, name, action):
     def deleted():
       del self.address_cache[name]
@@ -915,19 +933,49 @@ class Engine:
     if ssn.acked:
       messages = [m for m in ssn.acked if m not in sst.acked]
       if messages:
-        # XXX: we're ignoring acks that get lost when disconnected,
-        # could we deal this via some message-id based purge?
-        ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+        ids = RangedSet()
+
+        disposed = [(DEFAULT_DISPOSITION, [])]
+        for m in messages:
+          # XXX: we're ignoring acks that get lost when disconnected,
+          # could we deal this via some message-id based purge?
+          if m._transfer_id is None:
+            continue
+          ids.add(m._transfer_id)
+          disp = m._disposition or DEFAULT_DISPOSITION
+          last, msgs = disposed[-1]
+          if disp.type is last.type and disp.options == last.options:
+            msgs.append(m)
+          else:
+            disposed.append((disp, [m]))
+
         for range in ids:
           sst.executed.add_range(range)
         sst.write_op(SessionCompleted(sst.executed))
-        def ack_ack():
-          for m in messages:
-            ssn.acked.remove(m)
-            if not ssn.transactional:
-              sst.acked.remove(m)
-        sst.write_cmd(MessageAccept(ids), ack_ack)
-        log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+        def ack_acker(msgs):
+          def ack_ack():
+            for m in msgs:
+              ssn.acked.remove(m)
+              if not ssn.transactional:
+                sst.acked.remove(m)
+          return ack_ack
+
+        for disp, msgs in disposed:
+          if not msgs: continue
+          if disp.type is None:
+            op = MessageAccept
+          elif disp.type is RELEASED:
+            op = MessageRelease
+          elif disp.type is REJECTED:
+            op = MessageReject
+          sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+                           **disp.options),
+                        ack_acker(msgs))
+          if log.isEnabledFor(DEBUG):
+            for m in msgs:
+              log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
         sst.acked.extend(messages)
 
     if ssn.committing and not sst.committing:
@@ -948,7 +996,7 @@ class Engine:
         for range in ids:
           sst.executed.add_range(range)
         sst.write_op(SessionCompleted(sst.executed))
-        sst.write_cmd(MessageRelease(ids))
+        sst.write_cmd(MessageRelease(ids, True))
         sst.write_cmd(TxRollback(), do_rb_ok)
 
       def do_rb_ok():
@@ -1055,8 +1103,11 @@ class Engine:
       if mp.application_headers is None:
         mp.application_headers = {}
       mp.application_headers[TO] = msg.to
-    if msg.durable:
-      dp.delivery_mode = delivery_mode.persistent
+    if msg.durable is not None:
+      if msg.durable:
+        dp.delivery_mode = delivery_mode.persistent
+      else:
+        dp.delivery_mode = delivery_mode.non_persistent
     if msg.priority is not None:
       dp.priority = msg.priority
     if msg.ttl is not None:
@@ -1109,7 +1160,8 @@ class Engine:
     if mp.reply_to is not None:
       msg.reply_to = reply_to2addr(mp.reply_to)
     msg.correlation_id = mp.correlation_id
-    msg.durable = dp.delivery_mode == delivery_mode.persistent
+    if dp.delivery_mode is not None:
+      msg.durable = dp.delivery_mode == delivery_mode.persistent
     msg.priority = dp.priority
     msg.ttl = dp.ttl
     msg.redelivered = dp.redelivered

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 18:00:49 2010
@@ -295,14 +295,29 @@ class Session:
       create: <create-policy>,
       delete: <delete-policy>,
       assert: <assert-policy>,
-      node-properties: {
+      node: {
         type: <node-type>,
         durable: <node-durability>,
-        x-properties: {
-          bindings: ["<exchange>/<key>", ...],
-          <passthrough-key>: <passthrough-value>
-        }
+        x-declare: { ... <queue-declare overrides> ... }
+        x-bindings: [<binding_1>, ... <binding_n>]
       }
+      link: {
+        name: <link-name>,
+        durable: <link-durability>,
+        reliability: <link-reliability>,
+        x-declare: { ... <queue-declare overrides> ... }
+        x-bindings: [<binding_1>, ... <binding_n>]
+        x-subscribe: { ... <message-subscribe overrides> ... }
+      }
+    }
+
+  Bindings are specified as a map with the following options::
+
+    {
+      exchange: <exchange>,
+      queue: <queue>,
+      key: <key>,
+      arguments: <arguments>
     }
 
   The create, delete, and assert policies specify who should perfom
@@ -316,14 +331,12 @@ class Session:
   The node-type is one of:
 
     - I{topic}: a topic node will default to the topic exchange,
-      x-properties may be used to specify other exchange types
+      x-declare may be used to specify other exchange types
     - I{queue}: this is the default node-type
 
-  The x-properties map permits arbitrary additional keys and values to
-  be specified. These keys and values are passed through when creating
-  a node or asserting facts about an existing node. Any passthrough
-  keys and values that do not match a standard field of the underlying
-  exchange or queue declare command will be sent in the arguments map.
+  The x-declare map permits protocol specific keys and values to be
+  specified. These keys and values are passed through when creating a
+  node or asserting facts about an existing node.
 
   Examples
   --------
@@ -353,18 +366,18 @@ class Session:
 
   You can customize the properties of the queue::
 
-    my-queue; {create: always, node-properties: {durable: True}}
+    my-queue; {create: always, node: {durable: True}}
 
   You can create a topic instead if you want::
 
-    my-queue; {create: always, node-properties: {type: topic}}
+    my-queue; {create: always, node: {type: topic}}
 
   You can assert that the address resolves to a node with particular
   properties::
 
     my-transient-topic; {
       assert: always,
-      node-properties: {
+      node: {
         type: topic,
         durable: False
       }
@@ -508,7 +521,7 @@ class Session:
       raise Empty
 
   @synchronized
-  def acknowledge(self, message=None, sync=True):
+  def acknowledge(self, message=None, disposition=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
     unacknowledged messages on the session are acknowledged.
@@ -530,6 +543,7 @@ class Session:
           raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
         self._wakeup()
         self._ewait(lambda: len(self.acked) < self.ack_capacity)
+      m._disposition = disposition
       self.unacked.remove(m)
       self.acked.append(m)
 

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py Tue Mar 23 18:00:49 2010
@@ -129,7 +129,7 @@ class Message:
                  "correlation_id", "priority", "ttl"]:
       value = self.__dict__[name]
       if value is not None: args.append("%s=%r" % (name, value))
-    for name in ["durable", "properties"]:
+    for name in ["durable", "redelivered", "properties"]:
       value = self.__dict__[name]
       if value: args.append("%s=%r" % (name, value))
     if self.content_type != get_type(self.content):
@@ -141,4 +141,15 @@ class Message:
         args.append(repr(self.content))
     return "Message(%s)" % ", ".join(args)
 
-__all__ = ["Message"]
+class Disposition:
+
+  def __init__(self, type, **options):
+    self.type = type
+    self.options = options
+
+  def __repr__(self):
+    args = [str(self.type)] + \
+        ["%s=%r" % (k, v) for k, v in self.options.items()]
+    return "Disposition(%s)" % ", ".join(args)
+
+__all__ = ["Message", "Disposition"]

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py Tue Mar 23 18:00:49 2010
@@ -59,6 +59,9 @@ class Base(Test):
     else:
       return "%s[%s, %s]" % (base, count, self.test_id)
 
+  def message(self, base, count = None, **kwargs):
+    return Message(content=self.content(base, count), **kwargs)
+
   def ping(self, ssn):
     PING_Q = 'ping-queue; {create: always, delete: always}'
     # send a message
@@ -70,16 +73,52 @@ class Base(Test):
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None, timeout=0, expected=None):
-    contents = []
+  def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
+    messages = []
     try:
-      while limit is None or len(contents) < limit:
-        contents.append(rcv.fetch(timeout=timeout).content)
+      while limit is None or len(messages) < limit:
+        messages.append(rcv.fetch(timeout=timeout))
     except Empty:
       pass
     if expected is not None:
-      assert expected == contents, "expected %s, got %s" % (expected, contents)
-    return contents
+      self.assertEchos(expected, messages, redelivered)
+    return messages
+
+  def diff(self, m1, m2):
+    result = {}
+    for attr in ("id", "subject", "user_id", "to", "reply_to",
+                 "correlation_id", "durable", "priority", "ttl",
+                 "redelivered", "properties", "content_type",
+                 "content"):
+      a1 = getattr(m1, attr)
+      a2 = getattr(m2, attr)
+      if a1 != a2:
+        result[attr] = (a1, a2)
+    return result
+
+  def assertEcho(self, msg, echo, redelivered=False):
+    if not isinstance(msg, Message) or not isinstance(echo, Message):
+      if isinstance(msg, Message):
+        msg = msg.content
+      if isinstance(echo, Message):
+        echo = echo.content
+        assert msg == echo, "expected %s, got %s" % (msg, echo)
+    else:
+      delta = self.diff(msg, echo)
+      mttl, ettl = delta.pop("ttl", (0, 0))
+      if redelivered:
+        assert echo.redelivered, \
+            "expected %s to be redelivered: %s" % (msg, echo)
+        if delta.has_key("redelivered"):
+          del delta["redelivered"]
+      assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
+      assert mttl >= ettl, "%s, %s" % (mttl, ettl)
+      assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
+
+  def assertEchos(self, msgs, echoes, redelivered=False):
+    assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
+    for m, e in zip(msgs, echoes):
+      self.assertEcho(m, e, redelivered)
 
   def assertEmpty(self, rcv):
     contents = self.drain(rcv)

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 18:00:49 2010
@@ -227,21 +227,60 @@ class SessionTests(Base):
   def testAcknowledgeAsyncAckCapUNLIMITED(self):
     self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
 
-  def send(self, ssn, queue, base, count=1):
-    snd = ssn.sender(queue, durable=self.durable())
-    contents = []
+  def testRelease(self):
+    msgs = [self.message("testRelease", i) for i in range(3)]
+    snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+    for m in msgs:
+      snd.send(m)
+    rcv = self.ssn.receiver(snd.target)
+    echos = self.drain(rcv, expected=msgs)
+    self.ssn.acknowledge(echos[0])
+    self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+    self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+    self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+    self.drain(rcv, expected=msgs[2:3])
+    self.ssn.acknowledge()
+
+  def testReject(self):
+    msgs = [self.message("testReject", i) for i in range(3)]
+    snd = self.ssn.sender("""
+      test-reject-queue; {
+        create: always,
+        delete: always,
+        node: {
+          x-declare: {
+            alternate-exchange: 'amq.topic'
+          }
+        }
+      }
+""")
+    for m in msgs:
+      snd.send(m)
+    rcv = self.ssn.receiver(snd.target)
+    rej = self.ssn.receiver("amq.topic")
+    echos = self.drain(rcv, expected=msgs)
+    self.ssn.acknowledge(echos[0])
+    self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+    self.ssn.acknowledge(echos[2],
+                         Disposition(REJECTED, code=3, text="test-reject"))
+    self.drain(rej, expected=msgs[1:])
+    self.ssn.acknowledge()
+
+  def send(self, ssn, target, base, count=1):
+    snd = ssn.sender(target, durable=self.durable())
+    messages = []
     for i in range(count):
-      c = self.content(base, i)
+      c = self.message(base, i)
       snd.send(c)
-      contents.append(c)
+      messages.append(c)
     snd.close()
-    return contents
+    return messages
 
   def txTest(self, commit):
     TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
     TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(self.ssn, TX_Q, "txTest", 3)
+    messages = self.send(self.ssn, TX_Q, "txTest", 3)
     txrcv = txssn.receiver(TX_Q)
     txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
     rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
     if commit:
       txssn.commit()
       self.assertEmpty(rcv)
-      assert contents == self.drain(copy_rcv)
+      self.drain(copy_rcv, expected=messages)
     else:
       txssn.rollback()
-      assert contents == self.drain(rcv)
+      self.drain(rcv, expected=messages, redelivered=True)
       self.assertEmpty(copy_rcv)
     self.ssn.acknowledge()
 
@@ -271,13 +310,13 @@ class SessionTests(Base):
   def txTestSend(self, commit):
     TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+    messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
     rcv = self.ssn.receiver(TX_SEND_Q)
     self.assertEmpty(rcv)
 
     if commit:
       txssn.commit()
-      assert contents == self.drain(rcv)
+      self.drain(rcv, expected=messages)
       self.ssn.acknowledge()
     else:
       txssn.rollback()
@@ -297,18 +336,17 @@ class SessionTests(Base):
     txssn = self.conn.session(transactional=True)
     txrcv = txssn.receiver(TX_ACK_QC)
     self.assertEmpty(txrcv)
-    contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
-    assert contents == self.drain(txrcv)
+    messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+    self.drain(txrcv, expected=messages)
 
     if commit:
       txssn.acknowledge()
     else:
       txssn.rollback()
-      drained = self.drain(txrcv)
-      assert contents == drained, "expected %s, got %s" % (contents, drained)
+      self.drain(txrcv, expected=messages, redelivered=True)
       txssn.acknowledge()
       txssn.rollback()
-      assert contents == self.drain(txrcv)
+      self.drain(txrcv, expected=messages, redelivered=True)
       txssn.commit() # commit without ack
       self.assertEmpty(txrcv)
 
@@ -316,7 +354,7 @@ class SessionTests(Base):
 
     txssn = self.conn.session(transactional=True)
     txrcv = txssn.receiver(TX_ACK_QC)
-    assert contents == self.drain(txrcv)
+    self.drain(txrcv, expected=messages, redelivered=True)
     txssn.acknowledge()
     txssn.commit()
     rcv = self.ssn.receiver(TX_ACK_QD)
@@ -477,7 +515,7 @@ class ReceiverTests(Base):
     snd = self.ssn.sender("""test-double-close; {
   create: always,
   delete: sender,
-  node-properties: {
+  node: {
     type: topic
   }
 }
@@ -533,9 +571,9 @@ class AddressTests(Base):
       assert "error in options: %s" % error == str(e), e
 
   def testIllegalKey(self):
-    self.badOption("{create: always, node-properties: "
+    self.badOption("{create: always, node: "
                    "{this-property-does-not-exist: 3}}",
-                   "node-properties: this-property-does-not-exist: "
+                   "node: this-property-does-not-exist: "
                    "illegal key")
 
   def testWrongValue(self):
@@ -543,23 +581,17 @@ class AddressTests(Base):
                    "('always', 'sender', 'receiver', 'never')")
 
   def testWrongType1(self):
-    self.badOption("{node-properties: asdf}",
-                   "node-properties: asdf is not a map")
+    self.badOption("{node: asdf}",
+                   "node: asdf is not a map")
 
   def testWrongType2(self):
-    self.badOption("{node-properties: {durable: []}}",
-                   "node-properties: durable: [] is not a bool")
-
-  def testNonQueueBindings(self):
-    self.badOption("{node-properties: {type: topic, x-properties: "
-                   "{bindings: []}}}",
-                   "node-properties: x-properties: bindings: "
-                   "bindings are only permitted on nodes of type queue")
+    self.badOption("{node: {durable: []}}",
+                   "node: durable: [] is not a bool")
 
   def testCreateQueue(self):
     snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
-                          "node-properties: {type: queue, durable: False, "
-                          "x-properties: {auto_delete: true}}}")
+                          "node: {type: queue, durable: False, "
+                          "x-declare: {auto_delete: true}}}")
     content = self.content("testCreateQueue")
     snd.send(content)
     rcv = self.ssn.receiver("test-create-queue")
@@ -569,10 +601,10 @@ class AddressTests(Base):
     addr = """test-create-exchange; {
                 create: always,
                 delete: always,
-                node-properties: {
+                node: {
                   type: topic,
                   durable: False,
-                  x-properties: {auto_delete: true, %s}
+                  x-declare: {auto_delete: true, %s}
                 }
               }""" % props
     snd = self.ssn.sender(addr)
@@ -639,15 +671,15 @@ class AddressTests(Base):
     # XXX: need to figure out close after error
     self.conn._remove_session(self.ssn)
 
-  def testBindings(self):
+  def testNodeBindingsQueue(self):
     snd = self.ssn.sender("""
-test-bindings-queue; {
+test-node-bindings-queue; {
   create: always,
   delete: always,
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
-    }
+  node: {
+    x-bindings: [{exchange: "amq.topic", key: "a.#"},
+                 {exchange: "amq.direct", key: "b"},
+                 {exchange: "amq.topic", key: "c.*"}]
   }
 }
 """)
@@ -658,49 +690,80 @@ test-bindings-queue; {
     snd_a.send("two")
     snd_b.send("three")
     snd_c.send("four")
-    rcv = self.ssn.receiver("test-bindings-queue")
+    rcv = self.ssn.receiver("test-node-bindings-queue")
     self.drain(rcv, expected=["one", "two", "three", "four"])
 
-  def testBindingsAdditive(self):
-    m1 = self.content("testBindingsAdditive", 1)
-    m2 = self.content("testBindingsAdditive", 2)
-    m3 = self.content("testBindingsAdditive", 3)
-    m4 = self.content("testBindingsAdditive", 4)
-
+  def testNodeBindingsTopic(self):
+    rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+    rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+    rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+    rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
     snd = self.ssn.sender("""
-test-bindings-additive-queue; {
+test-node-bindings-topic; {
   create: always,
   delete: always,
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/a"]
-    }
+  node: {
+    type: topic,
+    x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+                 {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+                 {queue: test-node-bindings-topic-queue-b, key: "b"},
+                 {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
   }
 }
 """)
+    m1 = Message("one")
+    m2 = Message(subject="a.foo", content="two")
+    m3 = Message(subject="b", content="three")
+    m4 = Message(subject="c.bar", content="four")
+    snd.send(m1)
+    snd.send(m2)
+    snd.send(m3)
+    snd.send(m4)
+    self.drain(rcv, expected=[m1, m2, m3, m4])
+    self.drain(rcv_a, expected=[m2])
+    self.drain(rcv_b, expected=[m3])
+    self.drain(rcv_c, expected=[m4])
+
+  def testLinkBindings(self):
+    m_a = self.message("testLinkBindings", 1, subject="a")
+    m_b = self.message("testLinkBindings", 2, subject="b")
 
-    snd_a = self.ssn.sender("amq.topic/a")
-    snd_b = self.ssn.sender("amq.topic/b")
+    self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+    snd = self.ssn.sender("amq.topic")
 
-    snd_a.send(m1)
-    snd_b.send(m2)
+    snd.send(m_a)
+    snd.send(m_b)
+    snd.close()
 
-    rcv = self.ssn.receiver("test-bindings-additive-queue")
-    self.drain(rcv, expected=[m1])
+    rcv = self.ssn.receiver("test-link-bindings-queue")
+    self.assertEmpty(rcv)
+
+    snd = self.ssn.sender("""
+amq.topic; {
+  link: {
+    x-bindings: [{queue: test-link-bindings-queue, key: a}]
+  }
+}
+""")
 
-    new_snd = self.ssn.sender("""
-test-bindings-additive-queue; {
-  node-properties: {
-    x-properties: {
-      bindings: ["amq.topic/b"]
-    }
+    snd.send(m_a)
+    snd.send(m_b)
+
+    self.drain(rcv, expected=[m_a])
+    rcv.close()
+
+    rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+  link: {
+    x-bindings: [{exchange: "amq.topic", key: b}]
   }
 }
 """)
 
-    new_snd.send(m3)
-    snd_b.send(m4)
-    self.drain(rcv, expected=[m3, m4])
+    snd.send(m_a)
+    snd.send(m_b)
+
+    self.drain(rcv, expected=[m_a, m_b])
 
   def testSubjectOverride(self):
     snd = self.ssn.sender("amq.topic/a")
@@ -726,6 +789,32 @@ test-bindings-additive-queue; {
     assert e2.subject == "b", "subject: %s" % e2.subject
     self.assertEmpty(rcv)
 
+  def doReliabilityTest(self, reliability, messages, expected):
+    snd = self.ssn.sender("amq.topic")
+    rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+    for m in messages:
+      snd.send(m)
+    self.conn.disconnect()
+    self.conn.connect()
+    self.drain(rcv, expected=expected)
+
+  def testReliabilityUnreliable(self):
+    msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+    self.doReliabilityTest("unreliable", msgs, [])
+
+  def testReliabilityAtLeastOnce(self):
+    msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+    self.doReliabilityTest("at-least-once", msgs, msgs)
+
+  def testLinkName(self):
+    msgs = [self.message("testLinkName", i) for i in range(3)]
+    snd = self.ssn.sender("amq.topic")
+    trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+    qrcv = self.ssn.receiver("test-link-name")
+    for m in msgs:
+      snd.send(m)
+    self.drain(qrcv, expected=msgs)
+
 NOSUCH_Q = "this-queue-should-not-exist"
 UNPARSEABLE_ADDR = "name/subject; {bad options"
 UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -838,8 +927,7 @@ class SenderTests(Base):
     msgs = [self.content("asyncTest", i) for i in range(15)]
     for m in msgs:
       self.snd.send(m, sync=False)
-    drained = self.drain(self.rcv, timeout=self.delay())
-    assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+    self.drain(self.rcv, timeout=self.delay(), expected=msgs)
     self.ssn.acknowledge()
 
   def testSendAsyncCapacity0(self):

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py Tue Mar 23 18:00:49 2010
@@ -68,20 +68,7 @@ class MessageEchoTests(Base):
   def check(self, msg):
     self.snd.send(msg)
     echo = self.rcv.fetch(0)
-
-    assert msg.id == echo.id
-    assert msg.subject == echo.subject
-    assert msg.user_id == echo.user_id
-    assert msg.to == echo.to
-    assert msg.reply_to == echo.reply_to
-    assert msg.correlation_id == echo.correlation_id
-    assert msg.durable == echo.durable
-    assert msg.priority == echo.priority
-    assert msg.ttl == echo.ttl
-    assert msg.properties == echo.properties
-    assert msg.content_type == echo.content_type
-    assert msg.content == echo.content, "%s, %s" % (msg, echo)
-
+    self.assertEcho(msg, echo)
     self.ssn.acknowledge(echo)
 
   def testStringContent(self):

Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py Tue Mar 23 18:00:49 2010
@@ -54,6 +54,20 @@ class Types:
     else:
       return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types]))
 
+class List:
+
+  def __init__(self, condition):
+    self.condition = condition
+
+  def validate(self, o, ctx):
+    if not isinstance(o, list):
+      return "%s is not a list" % o
+
+    ctx.push(o)
+    for v in o:
+      err = self.condition.validate(v, ctx)
+      if err: return err
+
 class Map:
 
   def __init__(self, map, restricted=True):

Propchange: qpid/branches/qmf-devel0.7a/qpid/ruby/ext/sasl/extconf.rb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/ruby/ext/sasl/extconf.rb:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py Tue Mar 23 18:00:49 2010
@@ -430,6 +430,16 @@ class HeadersExchangeTests(TestHelper):
         self.myBasicPublish({"irrelevant":0})
         self.assertEmpty(self.q)
 
+    def testMatchVoidValue(self):
+        self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":None})
+        self.myAssertPublishGet({"name":"fred"})
+        self.myAssertPublishGet({"name":"bob"})
+
+        # Wont match
+        self.myBasicPublish({})
+        self.myBasicPublish({"irrelevant":0})
+        self.assertEmpty(self.q)
+
 
 class MiscellaneousErrorsTests(TestHelper):
     """

Propchange: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,2 +1,3 @@
 /qpid/branches/qmfv2/qpid/python/tests_0-9/queue.py:902858,902894
 /qpid/branches/qpid.rnr/python/tests_0-9/queue.py:894071-896158
+/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:919043-926606



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org