You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/27 21:07:44 UTC

svn commit: r1771647 - in /qpid/java/trunk: systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java test-profiles/Java10UninvestigatedTestsExcludes

Author: rgodfrey
Date: Sun Nov 27 21:07:44 2016
New Revision: 1771647

URL: http://svn.apache.org/viewvc?rev=1771647&view=rev
Log:
QPID-7546 : MessageGroupQueueTest

Modified:
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
    qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Sun Nov 27 21:07:44 2016
@@ -353,7 +353,7 @@ public class QpidBrokerTestCase extends
         }
         else
         {
-            return getConnectionWithOptions(Collections.singletonMap("max_prefetch", String.valueOf(prefetch)));
+            return getConnectionWithOptions(Collections.singletonMap("maxprefetch", String.valueOf(prefetch)));
         }
     }
 

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Sun Nov 27 21:07:44 2016
@@ -36,10 +36,10 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class MessageGroupQueueTest extends QpidBrokerTestCase
@@ -62,8 +62,8 @@ public class MessageGroupQueueTest exten
 
         producerConnection.start();
 
-        consumerConnection = getConnection();
-        
+        consumerConnection = getConnectionWithPrefetch(1);
+
     }
 
     protected void tearDown() throws Exception
@@ -114,17 +114,7 @@ public class MessageGroupQueueTest exten
      */
     private void simpleGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
-        if(sharedGroups)
-        {
-            arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-        }
-        ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
-        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
+        createQueueAndProducer(sharedGroups);
 
         String[] groups = { "ONE", "TWO"};
 
@@ -137,8 +127,8 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+        Session cs1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Session cs2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -174,6 +164,41 @@ public class MessageGroupQueueTest exten
         assertNull(consumer2.receive(1000));
     }
 
+    private void createQueueAndProducer(final boolean sharedGroups) throws QpidException, JMSException
+    {
+        if(isBroker10())
+        {
+            final Map<String, Object> arguments = new HashMap<>();
+            arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
+            arguments.put(ConfiguredObject.DURABLE, "false");
+            arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
+            if(sharedGroups)
+            {
+                arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_SHARED_GROUPS, "true");
+            }
+            createEntityUsingAmqpManagement(QUEUE, producerSession, "org.apache.qpid.Queue", arguments);
+            queue = producerSession.createQueue(QUEUE);
+        }
+        else
+        {
+            final Map<String, Object> arguments = new HashMap<>();
+            arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, "group");
+            if (sharedGroups)
+            {
+                arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, "1");
+            }
+            ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
+            queue = (Queue) producerSession.createQueue("direct://amq.direct/"
+                                                        + QUEUE
+                                                        + "/"
+                                                        + QUEUE
+                                                        + "?durable='false'&autodelete='true'");
+
+            ((AMQSession) producerSession).declareAndBind((AMQDestination) queue);
+        }
+        producer = producerSession.createProducer(queue);
+    }
+
 
     public void testConsumerCloseGroupAssignment() throws Exception
     {
@@ -204,17 +229,7 @@ public class MessageGroupQueueTest exten
      **/
     private void consumerCloseGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
-        if(sharedGroups)
-        {
-            arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-        }
-        ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
-        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
+        createQueueAndProducer(sharedGroups);
 
         producer.send(createMessage(1, "ONE"));
         producer.send(createMessage(2, "ONE"));
@@ -225,9 +240,9 @@ public class MessageGroupQueueTest exten
         producer.close();
         producerSession.close();
         producerConnection.close();
-
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+        boolean is010 = isBroker010();
+        Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+        Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
 
@@ -242,7 +257,14 @@ public class MessageGroupQueueTest exten
 
         assertNotNull("Consumer 2 should have received first message", cs2Received);
         assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
-        cs2.commit();
+        if(is010)
+        {
+            cs2Received.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
         Message cs2Received2 = consumer2.receive(1000);
 
@@ -250,14 +272,28 @@ public class MessageGroupQueueTest exten
 
         consumer1.close();
 
-        cs1.commit();
+        if(is010)
+        {
+            cs1Received.acknowledge();
+        }
+        else
+        {
+            cs1.commit();
+        }
         Message cs2Received3 = consumer2.receive(1000);
 
         assertNotNull("Consumer 2 should have received second message", cs2Received3);
         assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
         assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
 
-        cs2.commit();
+        if(is010)
+        {
+            cs2Received3.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
 
         Message cs2Received4 = consumer2.receive(1000);
@@ -265,7 +301,14 @@ public class MessageGroupQueueTest exten
         assertNotNull("Consumer 2 should have received third message", cs2Received4);
         assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
         assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
-        cs2.commit();
+        if(is010)
+        {
+            cs2Received4.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
         assertNull(consumer2.receive(1000));
     }
@@ -303,18 +346,7 @@ public class MessageGroupQueueTest exten
      */
     private void consumerCloseWithRelease(boolean sharedGroups) throws QpidException, JMSException
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
-        if(sharedGroups)
-        {
-            arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-        }
-
-        ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
-        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
+        createQueueAndProducer(sharedGroups);
 
         producer.send(createMessage(1, "ONE"));
         producer.send(createMessage(2, "ONE"));
@@ -326,8 +358,9 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+        boolean is010 = isBroker010();
+        Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+        Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
 
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -345,13 +378,21 @@ public class MessageGroupQueueTest exten
         assertNotNull("Consumer 2 should have received its first message", received);
         assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
 
-        received = consumer2.receive(1000);
+        Message received2 = consumer2.receive(1000);
 
-        assertNull("Consumer 2 should not yet have received second message", received);
+        assertNull("Consumer 2 should not yet have received second message", received2);
 
         consumer1.close();
         cs1.close();
-        cs2.commit();
+        if(is010)
+        {
+            received.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
+
         received = consumer2.receive(1000);
 
         assertNotNull("Consumer 2 should now have received second message", received);
@@ -360,8 +401,14 @@ public class MessageGroupQueueTest exten
         assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
                    received.getJMSRedelivered());
 
-        cs2.commit();
-
+        if(is010)
+        {
+            received.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
         received = consumer2.receive(1000);
 
@@ -369,7 +416,14 @@ public class MessageGroupQueueTest exten
         assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
         assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
 
-        cs2.commit();
+        if(is010)
+        {
+            received.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
         received = consumer2.receive(1000);
 
@@ -377,8 +431,14 @@ public class MessageGroupQueueTest exten
         assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
         assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
 
-        cs2.commit();
-
+        if(is010)
+        {
+            received.acknowledge();
+        }
+        else
+        {
+            cs2.commit();
+        }
 
         assertNull(consumer2.receive(1000));
     }
@@ -395,18 +455,7 @@ public class MessageGroupQueueTest exten
 
     private void groupAssignmentOnEmpty(boolean sharedGroups) throws QpidException, JMSException
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
-        if(sharedGroups)
-        {
-            arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-        }
-
-        ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
-        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
+        createQueueAndProducer(sharedGroups);
 
         producer.send(createMessage(1, "ONE"));
         producer.send(createMessage(2, "TWO"));
@@ -418,8 +467,9 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+        boolean is010 = isBroker010();
+        Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+        Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
 
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -428,20 +478,27 @@ public class MessageGroupQueueTest exten
 
         MessageConsumer consumer2 = cs2.createConsumer(queue);
 
-        Message received = consumer1.receive(1000);
-        assertNotNull("Consumer 1 should have received its first message", received);
-        assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+        Message cs1Received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received its first message", cs1Received);
+        assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
 
-        received = consumer2.receive(1000);
+        Message cs2Received = consumer2.receive(1000);
 
-        assertNotNull("Consumer 2 should have received its first message", received);
-        assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+        assertNotNull("Consumer 2 should have received its first message", cs2Received);
+        assertEquals("incorrect message received", 2, cs2Received.getIntProperty("msg"));
 
-        cs1.commit();
+        if(is010)
+        {
+            cs1Received.acknowledge();
+        }
+        else
+        {
+            cs1.commit();
+        }
 
-        received = consumer1.receive(1000);
-        assertNotNull("Consumer 1 should have received its second message", received);
-        assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+        cs1Received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received its second message", cs1Received);
+        assertEquals("incorrect message received", 3, cs1Received.getIntProperty("msg"));
 
         // We expect different behaviours from "shared groups": here the assignment of a subscription to a group
         // is terminated when there are no outstanding delivered but unacknowledged messages.  In contrast, with a
@@ -449,26 +506,54 @@ public class MessageGroupQueueTest exten
         // registered
         if(sharedGroups)
         {
-            cs2.commit();
-            received = consumer2.receive(1000);
-
-            assertNotNull("Consumer 2 should have received its second message", received);
-            assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
-
-            cs2.commit();
+            if(is010)
+            {
+                cs2Received.acknowledge();
+            }
+            else
+            {
+                cs2.commit();
+            }
+            cs2Received = consumer2.receive(1000);
+
+            assertNotNull("Consumer 2 should have received its second message", cs2Received);
+            assertEquals("incorrect message received", 4, cs2Received.getIntProperty("msg"));
+
+            if(is010)
+            {
+                cs2Received.acknowledge();
+            }
+            else
+            {
+                cs2.commit();
+            }
         }
         else
         {
-            cs2.commit();
-            received = consumer2.receive(1000);
-
-            assertNull("Consumer 2 should not have received a second message", received);
-
-            cs1.commit();
-
-            received = consumer1.receive(1000);
-            assertNotNull("Consumer 1 should have received its third message", received);
-            assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+            if(is010)
+            {
+                cs2Received.acknowledge();
+            }
+            else
+            {
+                cs2.commit();
+            }
+            cs2Received = consumer2.receive(1000);
+
+            assertNull("Consumer 2 should not have received a second message", cs2Received);
+
+            if(is010)
+            {
+                cs1Received.acknowledge();
+            }
+            else
+            {
+                cs1.commit();
+            }
+
+            cs1Received = consumer1.receive(1000);
+            assertNotNull("Consumer 1 should have received its third message", cs1Received);
+            assertEquals("incorrect message received", 4, cs1Received.getIntProperty("msg"));
 
         }
 
@@ -490,29 +575,15 @@ public class MessageGroupQueueTest exten
      */
     public void testSingleSharedGroupWithMultipleConsumers() throws Exception
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
-        arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-
-        ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
-        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
-
-
-        consumerConnection.close();
-        Map<String, String> options = new HashMap<String, String>();
-        options.put(ConnectionURL.OPTIONS_MAXPREFETCH, "1");
-        consumerConnection = getConnectionWithOptions(options);
+        createQueueAndProducer(true);
 
         int numMessages = 100;
         SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Session cs3 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Session cs4 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs1 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs2 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs3 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session cs4 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
         consumer1.setMessageListener(groupingTestMessageListener);

Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Sun Nov 27 21:07:44 2016
@@ -46,7 +46,6 @@ org.apache.qpid.server.queue.ProducerFlo
 org.apache.qpid.server.queue.PriorityQueueTest#*
 org.apache.qpid.server.queue.MultipleTransactedBatchProducerTest#*
 org.apache.qpid.server.queue.ModelTest#*
-org.apache.qpid.server.queue.MessageGroupQueueTest#*
 org.apache.qpid.server.queue.LiveQueueOperationsTest#*
 org.apache.qpid.server.queue.LastValueQueueTest#*
 org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org