You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/23 17:52:43 UTC

svn commit: r531512 - in /incubator/qpid/branches/M2/java/client/src: main/java/org/apache/qpid/client/AMQTemporaryQueue.java test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java

Author: ritchiem
Date: Mon Apr 23 08:52:43 2007
New Revision: 531512

URL: http://svn.apache.org/viewvc?view=rev&rev=531512
Log:
QPID-472 - Creation of TemporaryQueues will not guarantee unique queue names if created rapidly.

Updated TemporaryQueueTest.java so that it checks Headers/Queue/Topic for unroutable/mandatory messages beig returned.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=531512&r1=531511&r2=531512
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Mon Apr 23 08:52:43 2007
@@ -25,9 +25,10 @@
 
 import org.apache.qpid.framing.AMQShortString;
 
-/**
- * AMQ implementation of a TemporaryQueue.
- */
+import java.util.Random;
+import java.util.UUID;
+
+/** AMQ implementation of a TemporaryQueue. */
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
 
@@ -35,21 +36,17 @@
     private final AMQSession _session;
     private boolean _deleted;
 
-    /**
-     * Create a new instance of an AMQTemporaryQueue
-     */
+    /** Create a new instance of an AMQTemporaryQueue */
     public AMQTemporaryQueue(AMQSession session)
     {
-        super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
+        super(session.getTemporaryQueueExchangeName(), new AMQShortString("TempQueue" + UUID.randomUUID()), true);
         _session = session;
     }
 
-    /**
-     * @see javax.jms.TemporaryQueue#delete()
-     */
+    /** @see javax.jms.TemporaryQueue#delete() */
     public synchronized void delete() throws JMSException
     {
-        if(_session.hasConsumer(this))
+        if (_session.hasConsumer(this))
         {
             throw new JMSException("Temporary Queue has consumers so cannot be deleted");
         }

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?view=diff&rev=531512&r1=531511&r2=531512
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Mon Apr 23 08:52:43 2007
@@ -7,14 +7,20 @@
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
 
 import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.URLSyntaxException;
 
+import java.util.List;
+import java.util.LinkedList;
+
 public class TemporaryQueueTest extends TestCase
 {
 
@@ -35,7 +41,7 @@
     protected Connection createConnection() throws AMQException, URLSyntaxException
     {
         return new AMQConnection(_broker, "guest", "guest",
-                                                      "fred", "test");
+                                 "fred", "test");
     }
 
     public void testTempoaryQueue() throws Exception
@@ -50,14 +56,14 @@
         producer.send(session.createTextMessage("hello"));
         TextMessage tm = (TextMessage) consumer.receive(2000);
         assertNotNull(tm);
-        assertEquals("hello",tm.getText());
+        assertEquals("hello", tm.getText());
 
         try
         {
             queue.delete();
             fail("Expected JMSException : should not be able to delete while there are active consumers");
         }
-        catch(JMSException je)
+        catch (JMSException je)
         {
             ; //pass
         }
@@ -68,12 +74,133 @@
         {
             queue.delete();
         }
-        catch(JMSException je)
+        catch (JMSException je)
         {
             fail("Unexpected Exception: " + je.getMessage());
         }
 
         conn.close();
+    }
+
+    public void tUniqueness() throws JMSException, AMQException, URLSyntaxException
+    {
+        int numProcs = Runtime.getRuntime().availableProcessors();
+        final int threadsProc = 5;
+
+        runUniqueness(1, 10);
+        runUniqueness(numProcs * threadsProc, 10);
+        runUniqueness(numProcs * threadsProc, 100);
+        runUniqueness(numProcs * threadsProc, 500);
+    }
+
+    void runUniqueness(int makers, int queues) throws JMSException, AMQException, URLSyntaxException
+    {
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+
+        //Create Makers
+        for (int m = 0; m < makers; m++)
+        {
+            tqList.add(new TempQueueMaker(session, queues));
+        }
+
+
+        List<Thread> threadList = new LinkedList<Thread>();
+
+        //Create Makers
+        for (TempQueueMaker maker : tqList)
+        {
+            threadList.add(new Thread(maker));
+        }
+
+        //Start threads
+        for (Thread thread : threadList)
+        {
+            thread.start();
+        }
+
+        // Join Threads
+        for (Thread thread : threadList)
+        {
+            try
+            {
+                thread.join();
+            }
+            catch (InterruptedException e)
+            {
+                fail("Couldn't correctly join threads");
+            }
+        }
+
+
+        List<AMQQueue> list = new LinkedList<AMQQueue>();
+
+        // Test values
+        for (TempQueueMaker maker : tqList)
+        {
+            check(maker, list);
+        }
+
+        Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
+
+        connection.close();
+    }
+
+    private void check(TempQueueMaker tq, List<AMQQueue> list)
+    {
+        for (AMQQueue q : tq.getList())
+        {
+            if (list.contains(q))
+            {
+                fail(q + " already exists.");
+            }
+            else
+            {
+                list.add(q);
+            }
+        }
+    }
+
+
+    class TempQueueMaker implements Runnable
+    {
+        List<AMQQueue> _queues;
+        Session _session;
+        private int _count;
+
+
+        TempQueueMaker(Session session, int queues) throws JMSException
+        {
+            _queues = new LinkedList<AMQQueue>();
+
+            _count = queues;
+
+            _session = session;
+        }
+
+        public void run()
+        {
+            int i = 0;
+            try
+            {
+                for (; i < _count; i++)
+                {
+                    _queues.add((AMQQueue) _session.createTemporaryQueue());
+                }
+            }
+            catch (JMSException jmse)
+            {
+                //stop
+            }
+        }
+
+        List<AMQQueue> getList()
+        {
+            return _queues;
+        }
     }