You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/18 00:36:30 UTC
svn commit: r900244 -
/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
Author: rgodfrey
Date: Sun Jan 17 23:36:30 2010
New Revision: 900244
URL: http://svn.apache.org/viewvc?rev=900244&view=rev
Log:
QPID-2321 : updated test to use QpidTestCase, put in assertions, added 2 browsers test
Modified:
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java?rev=900244&r1=900243&r2=900244&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java Sun Jan 17 23:36:30 2010
@@ -27,6 +27,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.AMQException;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.framing.AMQShortString;
@@ -41,14 +42,12 @@
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.*;
import java.net.URISyntaxException;
import junit.framework.TestCase;
-public class ConflationQueueTest extends TestCase
+public class ConflationQueueTest extends QpidTestCase
{
private static final int TIMEOUT = 1500;
@@ -56,13 +55,12 @@
private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
- protected final String BROKER = "vm://:1";
+
protected final String VHOST = "/test";
- protected final String QUEUE = "PriorityQueue";
+ protected final String QUEUE = "ConflationQueue";
- private static final int MSG_COUNT = 4000;
+ private static final int MSG_COUNT = 400;
- private Context context = null;
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
@@ -77,19 +75,7 @@
{
super.setUp();
- if (usingInVMBroker())
- {
- TransportConnection.createVMBroker(1);
- }
-
- InitialContextFactory factory = new PropertiesFileInitialContextFactory();
- Hashtable<String, String> env = new Hashtable<String, String>();
-
- env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
-
- context = factory.getInitialContext(env);
- producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ producerConnection = getConnection();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
@@ -97,25 +83,16 @@
}
- private boolean usingInVMBroker()
- {
- return BROKER.startsWith("vm://");
- }
-
protected void tearDown() throws Exception
{
producerConnection.close();
consumerConnection.close();
- if (usingInVMBroker())
- {
- TransportConnection.killAllVMBrokers();
- }
super.tearDown();
}
- public void testConflation() throws JMSException, NamingException, AMQException, InterruptedException
+ public void testConflation() throws Exception
{
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -128,13 +105,7 @@
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
-
+ producer.send(nextMessage(msg, producerSession));
}
producer.close();
@@ -144,23 +115,28 @@
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received;
- int receivedCount = 0;
+
+ List<Message> messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
+ assertEquals("Unexpected number of messages received",10,messages.size());
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
}
- public void testConflationWithRelease() throws JMSException, NamingException, AMQException, InterruptedException
+ public void testConflationWithRelease() throws Exception
{
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -173,70 +149,71 @@
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
+ producer.send(nextMessage(msg, producerSession));
}
// HACK to do something synchronous
- producerSession.createTemporaryQueue();
+ ((AMQSession)producerSession).sync();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received;
- int receivedCount = 0;
+ List<Message> messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ }
consumerSession.close();
consumerConnection.close();
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
-
+ producer.send(nextMessage(msg, producerSession));
}
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
- receivedCount = 0;
+
+ messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
-
+ assertEquals("Unexpected number of messages received",10,messages.size());
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
}
- public void testConflationWithReleaseAfterNewPublish() throws JMSException, NamingException, AMQException, InterruptedException
+ public void testConflationWithReleaseAfterNewPublish() throws Exception
{
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -249,72 +226,72 @@
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
-
+ producer.send(nextMessage(msg, producerSession));
}
// HACK to do something synchronous
- producerSession.createTemporaryQueue();
+ ((AMQSession)producerSession).sync();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received;
- int receivedCount = 0;
+ List<Message> messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ }
consumer.close();
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
-
+ producer.send(nextMessage(msg, producerSession));
}
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+ // this causes the "old" messages to be released
consumerSession.close();
consumerConnection.close();
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
- receivedCount = 0;
+
+ messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
-
+ assertEquals("Unexpected number of messages received",10,messages.size());
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
}
- public void testConflationBrowser() throws JMSException, NamingException, AMQException, InterruptedException, URISyntaxException
+ public void testConflationBrowser() throws Exception
{
- consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -327,44 +304,45 @@
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, false, producerSession, producer));
- if(msg%10000 == 0)
- {
- System.err.println("Sent... " + msg);
- Thread.sleep(1000);
- }
+ producer.send(nextMessage(msg, producerSession));
}
((AMQSession)producerSession).sync();
- //
AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
AMQQueue browseQueue = new AMQQueue(url);
consumer = consumerSession.createConsumer(browseQueue);
consumerConnection.start();
Message received;
- int receivedCount = 0;
+ List<Message> messages = new ArrayList<Message>();
while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
- System.err.println("Received Count: " + receivedCount);
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
- producer.send(nextMessage(MSG_COUNT, false, producerSession, producer));
+ messages.clear();
+
+ producer.send(nextMessage(MSG_COUNT, producerSession));
((AMQSession)producerSession).sync();
- while((received = consumer.receive(5000))!=null)
+ while((received = consumer.receive(1000))!=null)
{
- receivedCount++;
- System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ messages.add(received);
}
+ assertEquals("Unexpected number of messages received",1,messages.size());
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
- System.err.println("Received Count: " + receivedCount);
producer.close();
producerSession.close();
@@ -375,8 +353,78 @@
}
+ public void testConflation2Browsers() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+
+ }
- private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ ((AMQSession)producerSession).sync();
+
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ consumer = consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
+ consumerConnection.start();
+ List<Message> messages = new ArrayList<Message>();
+ List<Message> messages2 = new ArrayList<Message>();
+ Message received = consumer.receive(1000);
+ Message received2 = consumer2.receive(1000);
+
+ while(received!=null || received2!=null)
+ {
+ if(received != null)
+ {
+ messages.add(received);
+ }
+ if(received2 != null)
+ {
+ messages2.add(received2);
+ }
+
+
+ received = consumer.receive(1000);
+ received2 = consumer2.receive(1000);
+
+ }
+
+ assertEquals("Unexpected number of messages received on first browser",10,messages.size());
+ assertEquals("Unexpected number of messages received on second browser",10,messages2.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ msg = messages2.get(i);
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+
+ }
+
+
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
@@ -389,3 +437,4 @@
}
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org