You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/10 17:24:34 UTC

svn commit: r1181019 - /qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Author: kgiusti
Date: Mon Oct 10 15:24:34 2011
New Revision: 1181019

URL: http://svn.apache.org/viewvc?rev=1181019&view=rev
Log:
QPID-3346: fix capacity settings in tests, add transactioned sender test

Modified:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1181019&r1=1181018&r2=1181019&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Mon Oct 10 15:24:34 2011
@@ -64,9 +64,9 @@ class MultiConsumerMsgGroupTests(Base):
 
         # create consumers on separate sessions: C1,C2
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         # C1 should acquire A-0, then C2 should acquire B-3
 
@@ -189,9 +189,9 @@ class MultiConsumerMsgGroupTests(Base):
 
         # create consumer and browser
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
 
         # C1 should acquire A-0
 
@@ -242,9 +242,9 @@ class MultiConsumerMsgGroupTests(Base):
             snd.send(m)
 
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
@@ -280,9 +280,9 @@ class MultiConsumerMsgGroupTests(Base):
             snd.send(m)
 
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
@@ -317,9 +317,9 @@ class MultiConsumerMsgGroupTests(Base):
             snd.send(m)
 
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         # C1 will own group A
         m1 = c1.fetch(0)
@@ -383,9 +383,9 @@ class MultiConsumerMsgGroupTests(Base):
             snd.send(m)
 
         s1 = self.conn.session(transactional=True)
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.conn.session(transactional=True)
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         # C1 gets group A
         m1 = c1.fetch(0)
@@ -400,8 +400,8 @@ class MultiConsumerMsgGroupTests(Base):
         s1.acknowledge(m1)  # A-0 consumed, A group freed
         s2.acknowledge(m2)  # B-2 consumed, B group freed
 
-        s1.commit()
-        s2.rollback()  # release B-2 and group B
+        s1.commit()    # A-0 consumption done, A group now free
+        s2.rollback()  # releases B-2, and group B
 
         ## Q: ["A1","B2","B3","A4","B5"]
 
@@ -477,6 +477,102 @@ class MultiConsumerMsgGroupTests(Base):
         s2.acknowledge()
         s2.commit()
 
+    def test_send_transaction(self):
+        """ Verify behavior when sender is using transactions.
+        """
+        ssn = self.conn.session(transactional=True)
+        snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
+                         " node: {x-declare: {arguments:" +
+                         " {'qpid.group_header_key':'THE-GROUP'," +
+                         "'qpid.shared_msg_group':1}}}}")
+
+        msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
+        snd.send(msg)
+        msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
+        snd.send(msg)
+        snd.session.commit()
+        msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
+        snd.send(msg)
+
+        # Queue: [A0,B1, (uncommitted: A2) ]
+
+        s1 = self.conn.session(transactional=True)
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
+        s2 = self.conn.session(transactional=True)
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+        # C1 gets A0, group A
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # C2 gets B2, group B
+        m2 = c2.fetch(0)
+        assert m2.properties['THE-GROUP'] == 'B'
+        assert m2.content['index'] == 1
+
+        # Since A2 uncommitted, there should be nothing left to fetch
+        try:
+            mX = c1.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+        try:
+            mX = c2.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+
+        snd.session.commit()
+        msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
+        snd.send(msg)
+
+        # Queue: [A2, (uncommitted: B3) ]
+
+        # B3 has yet to be committed, so C2 should see nothing available:
+        try:
+            mX = c2.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+
+        # but A2 should be available to C1
+        m3 = c1.fetch(0)
+        assert m3.properties['THE-GROUP'] == 'A'
+        assert m3.content['index'] == 2
+
+        # now make B3 available
+        snd.session.commit()
+
+        # C1 should still be done:
+        try:
+            mX = c1.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+
+        # but C2 should find the new B
+        m4 = c2.fetch(0)
+        assert m4.properties['THE-GROUP'] == 'B'
+        assert m4.content['index'] == 3
+
+        # extra: have C1 rollback, verify C2 finds the released 'A' messages
+        c1.session.rollback()
+
+        ## Q: ["A0","A2"]
+
+        # C2 should be able to get the next A
+        m5 = c2.fetch(0)
+        assert m5.properties['THE-GROUP'] == 'A'
+        assert m5.content['index'] == 0
+
+        m6 = c2.fetch(0)
+        assert m6.properties['THE-GROUP'] == 'A'
+        assert m6.content['index'] == 2
+
+        c2.session.acknowledge()
+        c2.session.commit()
+
     def test_query(self):
         """ Verify the queue query method against message groups
         """
@@ -494,9 +590,9 @@ class MultiConsumerMsgGroupTests(Base):
             snd.send(m)
 
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         m1 = c1.fetch(0)
         m2 = c2.fetch(0)
@@ -568,7 +664,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all B's removed....
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -599,7 +695,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # acquire group "A"
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
         assert m1.content['index'] == 0
@@ -620,7 +716,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all other A's removed....
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -652,7 +748,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # acquire group "A"
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
         assert m1.content['index'] == 0
@@ -673,7 +769,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all other A's removed....
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         a_count = 0
         try:
@@ -714,7 +810,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # acquire group "A"
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
         assert m1.content['index'] == 0
@@ -733,7 +829,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all other A's removed from msg-group-q
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -746,7 +842,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify the moved A's are at the dest-q
         s2 = self.setup_session()
-        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -797,7 +893,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all B's removed from msg-group-q
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -810,7 +906,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify the moved B's are at the dest-q
         s2 = self.setup_session()
-        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -846,7 +942,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # acquire group "A"
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         m1 = c1.fetch(0)
         assert m1.properties['THE-GROUP'] == 'A'
         assert m1.content['index'] == 0
@@ -867,7 +963,7 @@ class MultiConsumerMsgGroupTests(Base):
 
         # verify all other A's removed....
         s2 = self.setup_session()
-        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
         count = 0
         try:
             while True:
@@ -917,9 +1013,9 @@ class MultiConsumerMsgGroupTests(Base):
 
         # create consumers
         s1 = self.setup_session()
-        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
         s2 = self.setup_session()
-        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
 
         # C1 should acquire A-0
         m1 = c1.fetch(0);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org