You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/18 00:36:30 UTC

svn commit: r900244 - /qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java

Author: rgodfrey
Date: Sun Jan 17 23:36:30 2010
New Revision: 900244

URL: http://svn.apache.org/viewvc?rev=900244&view=rev
Log:
QPID-2321 : updated test to use QpidTestCase, put in assertions, added 2 browsers test

Modified:
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java?rev=900244&r1=900243&r2=900244&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java Sun Jan 17 23:36:30 2010
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.framing.AMQShortString;
 
@@ -41,14 +42,12 @@
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.HashMap;
+import java.util.*;
 import java.net.URISyntaxException;
 
 import junit.framework.TestCase;
 
-public class ConflationQueueTest extends TestCase
+public class ConflationQueueTest extends QpidTestCase
 {
     private static final int TIMEOUT = 1500;
 
@@ -56,13 +55,12 @@
     private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
 
 
-    protected final String BROKER = "vm://:1";
+
     protected final String VHOST = "/test";
-    protected final String QUEUE = "PriorityQueue";
+    protected final String QUEUE = "ConflationQueue";
 
-    private static final int MSG_COUNT = 4000;
+    private static final int MSG_COUNT = 400;
 
-    private Context context = null;
     private Connection producerConnection;
     private MessageProducer producer;
     private Session producerSession;
@@ -77,19 +75,7 @@
     {
         super.setUp();
 
-        if (usingInVMBroker())
-        {
-            TransportConnection.createVMBroker(1);
-        }
-
-        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-        Hashtable<String, String> env = new Hashtable<String, String>();
-
-        env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
-        env.put("queue.queue", QUEUE);
-
-        context = factory.getInitialContext(env);
-        producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        producerConnection = getConnection();
         producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         producerConnection.start();
@@ -97,25 +83,16 @@
 
     }
 
-    private boolean usingInVMBroker()
-    {
-        return BROKER.startsWith("vm://");
-    }
-
     protected void tearDown() throws Exception
     {
         producerConnection.close();
         consumerConnection.close();
-        if (usingInVMBroker())
-        {
-            TransportConnection.killAllVMBrokers();
-        }
         super.tearDown();
     }
 
-    public void testConflation() throws JMSException, NamingException, AMQException, InterruptedException
+    public void testConflation() throws Exception
     {
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
@@ -128,13 +105,7 @@
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
-
+            producer.send(nextMessage(msg, producerSession));
         }
 
         producer.close();
@@ -144,23 +115,28 @@
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
         Message received;
-        int receivedCount = 0;
+
+        List<Message> messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
+        assertEquals("Unexpected number of messages received",10,messages.size());
 
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+        }
 
 
     }
 
 
-    public void testConflationWithRelease() throws JMSException, NamingException, AMQException, InterruptedException
+    public void testConflationWithRelease() throws Exception
     {
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
@@ -173,70 +149,71 @@
 
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
+            producer.send(nextMessage(msg, producerSession));
 
         }
 
         // HACK to do something synchronous
-        producerSession.createTemporaryQueue();
+        ((AMQSession)producerSession).sync();
 
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
         Message received;
-        int receivedCount = 0;
+        List<Message> messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
+        assertEquals("Unexpected number of messages received",10,messages.size());
+
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+        }
 
         consumerSession.close();
         consumerConnection.close();
 
 
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
         for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
-
+            producer.send(nextMessage(msg, producerSession));
         }
 
 
+        // HACK to do something synchronous
+        ((AMQSession)producerSession).sync();
+        
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
-        receivedCount = 0;
+
+        messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
-
+        assertEquals("Unexpected number of messages received",10,messages.size());
 
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+        }
 
     }
 
 
 
-    public void testConflationWithReleaseAfterNewPublish() throws JMSException, NamingException, AMQException, InterruptedException
+    public void testConflationWithReleaseAfterNewPublish() throws Exception
     {
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
 
@@ -249,72 +226,72 @@
 
         for (int msg = 0; msg < MSG_COUNT/2; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
-
+            producer.send(nextMessage(msg, producerSession));
         }
 
         // HACK to do something synchronous
-        producerSession.createTemporaryQueue();
+        ((AMQSession)producerSession).sync();
 
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
         Message received;
-        int receivedCount = 0;
+        List<Message> messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
+        assertEquals("Unexpected number of messages received",10,messages.size());
+
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+        }
 
         consumer.close();
 
         for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
-
+            producer.send(nextMessage(msg, producerSession));
         }
 
+        // HACK to do something synchronous
+        ((AMQSession)producerSession).sync();
 
 
+        // this causes the "old" messages to be released
         consumerSession.close();
         consumerConnection.close();
 
 
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
 
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
-        receivedCount = 0;
+        
+        messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
-
+        assertEquals("Unexpected number of messages received",10,messages.size());
 
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+        }
 
     }
 
-    public void testConflationBrowser() throws JMSException, NamingException, AMQException, InterruptedException, URISyntaxException
+    public void testConflationBrowser() throws Exception
     {
-        consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+        consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
 
@@ -327,44 +304,45 @@
 
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
-            producer.send(nextMessage(msg, false, producerSession, producer));
-            if(msg%10000 == 0)
-            {
-                System.err.println("Sent... " + msg);
-                Thread.sleep(1000);
-            }
+            producer.send(nextMessage(msg, producerSession));
 
         }
 
         ((AMQSession)producerSession).sync();
 
-        //
         AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
         AMQQueue browseQueue = new AMQQueue(url);
 
         consumer = consumerSession.createConsumer(browseQueue);
         consumerConnection.start();
         Message received;
-        int receivedCount = 0;
+        List<Message> messages = new ArrayList<Message>();
         while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
 
-        System.err.println("Received Count: " + receivedCount);
+        assertEquals("Unexpected number of messages received",10,messages.size());
+
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+        }
 
-        producer.send(nextMessage(MSG_COUNT, false, producerSession, producer));
+        messages.clear();
+
+        producer.send(nextMessage(MSG_COUNT, producerSession));
 
         ((AMQSession)producerSession).sync();
 
-        while((received = consumer.receive(5000))!=null)
+        while((received = consumer.receive(1000))!=null)
         {
-            receivedCount++;
-            System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+            messages.add(received);
         }
+        assertEquals("Unexpected number of messages received",1,messages.size());
+        assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
 
-        System.err.println("Received Count: " + receivedCount);
 
         producer.close();
         producerSession.close();
@@ -375,8 +353,78 @@
     }
 
 
+    public void testConflation2Browsers() throws Exception
+    {
+        consumerConnection = getConnection();
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("qpid.last_value_queue_key","key");
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.send(nextMessage(msg, producerSession));
+
+        }
 
-    private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+        ((AMQSession)producerSession).sync();
+
+        AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
+        AMQQueue browseQueue = new AMQQueue(url);
+
+        consumer = consumerSession.createConsumer(browseQueue);
+        MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
+        consumerConnection.start();
+        List<Message> messages = new ArrayList<Message>();
+        List<Message> messages2 = new ArrayList<Message>();
+        Message received  = consumer.receive(1000);
+        Message received2  = consumer2.receive(1000);
+
+        while(received!=null || received2!=null)
+        {
+            if(received != null)
+            {
+                messages.add(received);
+            }
+            if(received2 != null)
+            {
+                messages2.add(received2);
+            }
+
+
+            received  = consumer.receive(1000);
+            received2  = consumer2.receive(1000);
+
+        }
+
+        assertEquals("Unexpected number of messages received on first browser",10,messages.size());
+        assertEquals("Unexpected number of messages received on second browser",10,messages2.size());
+
+        for(int i = 0 ; i < 10; i++)
+        {
+            Message msg = messages.get(i);
+            assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+            msg = messages2.get(i);
+            assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+        }
+
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+
+
+    }
+
+
+
+    private Message nextMessage(int msg, Session producerSession) throws JMSException
     {
         Message send = producerSession.createTextMessage("Message: " + msg);
 
@@ -389,3 +437,4 @@
 
 }
 
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org