You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/27 21:07:44 UTC
svn commit: r1771647 - in /qpid/java/trunk:
systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
test-profiles/Java10UninvestigatedTestsExcludes
Author: rgodfrey
Date: Sun Nov 27 21:07:44 2016
New Revision: 1771647
URL: http://svn.apache.org/viewvc?rev=1771647&view=rev
Log:
QPID-7546 : MessageGroupQueueTest
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Sun Nov 27 21:07:44 2016
@@ -353,7 +353,7 @@ public class QpidBrokerTestCase extends
}
else
{
- return getConnectionWithOptions(Collections.singletonMap("max_prefetch", String.valueOf(prefetch)));
+ return getConnectionWithOptions(Collections.singletonMap("maxprefetch", String.valueOf(prefetch)));
}
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Sun Nov 27 21:07:44 2016
@@ -36,10 +36,10 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class MessageGroupQueueTest extends QpidBrokerTestCase
@@ -62,8 +62,8 @@ public class MessageGroupQueueTest exten
producerConnection.start();
- consumerConnection = getConnection();
-
+ consumerConnection = getConnectionWithPrefetch(1);
+
}
protected void tearDown() throws Exception
@@ -114,17 +114,7 @@ public class MessageGroupQueueTest exten
*/
private void simpleGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
- if(sharedGroups)
- {
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
- }
- ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
- queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createQueueAndProducer(sharedGroups);
String[] groups = { "ONE", "TWO"};
@@ -137,8 +127,8 @@ public class MessageGroupQueueTest exten
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session cs2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -174,6 +164,41 @@ public class MessageGroupQueueTest exten
assertNull(consumer2.receive(1000));
}
+ private void createQueueAndProducer(final boolean sharedGroups) throws QpidException, JMSException
+ {
+ if(isBroker10())
+ {
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
+ arguments.put(ConfiguredObject.DURABLE, "false");
+ arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
+ if(sharedGroups)
+ {
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_SHARED_GROUPS, "true");
+ }
+ createEntityUsingAmqpManagement(QUEUE, producerSession, "org.apache.qpid.Queue", arguments);
+ queue = producerSession.createQueue(QUEUE);
+ }
+ else
+ {
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, "group");
+ if (sharedGroups)
+ {
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, "1");
+ }
+ ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"
+ + QUEUE
+ + "/"
+ + QUEUE
+ + "?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination) queue);
+ }
+ producer = producerSession.createProducer(queue);
+ }
+
public void testConsumerCloseGroupAssignment() throws Exception
{
@@ -204,17 +229,7 @@ public class MessageGroupQueueTest exten
**/
private void consumerCloseGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
- if(sharedGroups)
- {
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
- }
- ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
- queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createQueueAndProducer(sharedGroups);
producer.send(createMessage(1, "ONE"));
producer.send(createMessage(2, "ONE"));
@@ -225,9 +240,9 @@ public class MessageGroupQueueTest exten
producer.close();
producerSession.close();
producerConnection.close();
-
- Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ boolean is010 = isBroker010();
+ Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+ Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -242,7 +257,14 @@ public class MessageGroupQueueTest exten
assertNotNull("Consumer 2 should have received first message", cs2Received);
assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
- cs2.commit();
+ if(is010)
+ {
+ cs2Received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
Message cs2Received2 = consumer2.receive(1000);
@@ -250,14 +272,28 @@ public class MessageGroupQueueTest exten
consumer1.close();
- cs1.commit();
+ if(is010)
+ {
+ cs1Received.acknowledge();
+ }
+ else
+ {
+ cs1.commit();
+ }
Message cs2Received3 = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received second message", cs2Received3);
assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
- cs2.commit();
+ if(is010)
+ {
+ cs2Received3.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
Message cs2Received4 = consumer2.receive(1000);
@@ -265,7 +301,14 @@ public class MessageGroupQueueTest exten
assertNotNull("Consumer 2 should have received third message", cs2Received4);
assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
- cs2.commit();
+ if(is010)
+ {
+ cs2Received4.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
assertNull(consumer2.receive(1000));
}
@@ -303,18 +346,7 @@ public class MessageGroupQueueTest exten
*/
private void consumerCloseWithRelease(boolean sharedGroups) throws QpidException, JMSException
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
- if(sharedGroups)
- {
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
- }
-
- ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
- queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createQueueAndProducer(sharedGroups);
producer.send(createMessage(1, "ONE"));
producer.send(createMessage(2, "ONE"));
@@ -326,8 +358,9 @@ public class MessageGroupQueueTest exten
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ boolean is010 = isBroker010();
+ Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+ Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -345,13 +378,21 @@ public class MessageGroupQueueTest exten
assertNotNull("Consumer 2 should have received its first message", received);
assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
- received = consumer2.receive(1000);
+ Message received2 = consumer2.receive(1000);
- assertNull("Consumer 2 should not yet have received second message", received);
+ assertNull("Consumer 2 should not yet have received second message", received2);
consumer1.close();
cs1.close();
- cs2.commit();
+ if(is010)
+ {
+ received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
+
received = consumer2.receive(1000);
assertNotNull("Consumer 2 should now have received second message", received);
@@ -360,8 +401,14 @@ public class MessageGroupQueueTest exten
assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
received.getJMSRedelivered());
- cs2.commit();
-
+ if(is010)
+ {
+ received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
received = consumer2.receive(1000);
@@ -369,7 +416,14 @@ public class MessageGroupQueueTest exten
assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
- cs2.commit();
+ if(is010)
+ {
+ received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
received = consumer2.receive(1000);
@@ -377,8 +431,14 @@ public class MessageGroupQueueTest exten
assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
- cs2.commit();
-
+ if(is010)
+ {
+ received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
assertNull(consumer2.receive(1000));
}
@@ -395,18 +455,7 @@ public class MessageGroupQueueTest exten
private void groupAssignmentOnEmpty(boolean sharedGroups) throws QpidException, JMSException
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
- if(sharedGroups)
- {
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
- }
-
- ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
- queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createQueueAndProducer(sharedGroups);
producer.send(createMessage(1, "ONE"));
producer.send(createMessage(2, "TWO"));
@@ -418,8 +467,9 @@ public class MessageGroupQueueTest exten
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ boolean is010 = isBroker010();
+ Session cs1 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
+ Session cs2 = consumerConnection.createSession(!is010, is010 ? Session.CLIENT_ACKNOWLEDGE : Session.SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -428,20 +478,27 @@ public class MessageGroupQueueTest exten
MessageConsumer consumer2 = cs2.createConsumer(queue);
- Message received = consumer1.receive(1000);
- assertNotNull("Consumer 1 should have received its first message", received);
- assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
- received = consumer2.receive(1000);
+ Message cs2Received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received its first message", received);
- assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+ assertNotNull("Consumer 2 should have received its first message", cs2Received);
+ assertEquals("incorrect message received", 2, cs2Received.getIntProperty("msg"));
- cs1.commit();
+ if(is010)
+ {
+ cs1Received.acknowledge();
+ }
+ else
+ {
+ cs1.commit();
+ }
- received = consumer1.receive(1000);
- assertNotNull("Consumer 1 should have received its second message", received);
- assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+ cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its second message", cs1Received);
+ assertEquals("incorrect message received", 3, cs1Received.getIntProperty("msg"));
// We expect different behaviours from "shared groups": here the assignment of a subscription to a group
// is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a
@@ -449,26 +506,54 @@ public class MessageGroupQueueTest exten
// registered
if(sharedGroups)
{
- cs2.commit();
- received = consumer2.receive(1000);
-
- assertNotNull("Consumer 2 should have received its second message", received);
- assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
-
- cs2.commit();
+ if(is010)
+ {
+ cs2Received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
+ cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its second message", cs2Received);
+ assertEquals("incorrect message received", 4, cs2Received.getIntProperty("msg"));
+
+ if(is010)
+ {
+ cs2Received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
}
else
{
- cs2.commit();
- received = consumer2.receive(1000);
-
- assertNull("Consumer 2 should not have received a second message", received);
-
- cs1.commit();
-
- received = consumer1.receive(1000);
- assertNotNull("Consumer 1 should have received its third message", received);
- assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+ if(is010)
+ {
+ cs2Received.acknowledge();
+ }
+ else
+ {
+ cs2.commit();
+ }
+ cs2Received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received a second message", cs2Received);
+
+ if(is010)
+ {
+ cs1Received.acknowledge();
+ }
+ else
+ {
+ cs1.commit();
+ }
+
+ cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its third message", cs1Received);
+ assertEquals("incorrect message received", 4, cs1Received.getIntProperty("msg"));
}
@@ -490,29 +575,15 @@ public class MessageGroupQueueTest exten
*/
public void testSingleSharedGroupWithMultipleConsumers() throws Exception
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"group");
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
-
- ((AMQSession) producerSession).createQueue(QUEUE, true, false, false, arguments);
- queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
-
-
- consumerConnection.close();
- Map<String, String> options = new HashMap<String, String>();
- options.put(ConnectionURL.OPTIONS_MAXPREFETCH, "1");
- consumerConnection = getConnectionWithOptions(options);
+ createQueueAndProducer(true);
int numMessages = 100;
SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session cs3 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session cs4 = ((AMQConnection)consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs1 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs2 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs3 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session cs4 = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = cs1.createConsumer(queue);
consumer1.setMessageListener(groupingTestMessageListener);
Modified: qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes?rev=1771647&r1=1771646&r2=1771647&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10UninvestigatedTestsExcludes Sun Nov 27 21:07:44 2016
@@ -46,7 +46,6 @@ org.apache.qpid.server.queue.ProducerFlo
org.apache.qpid.server.queue.PriorityQueueTest#*
org.apache.qpid.server.queue.MultipleTransactedBatchProducerTest#*
org.apache.qpid.server.queue.ModelTest#*
-org.apache.qpid.server.queue.MessageGroupQueueTest#*
org.apache.qpid.server.queue.LiveQueueOperationsTest#*
org.apache.qpid.server.queue.LastValueQueueTest#*
org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org