You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/16 14:36:27 UTC

svn commit: r487801 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ test/java/org/apache/qpid/test/unit/client/temporaryqueue/

Author: rgreig
Date: Sat Dec 16 05:36:26 2006
New Revision: 487801

URL: http://svn.apache.org/viewvc?view=rev&rev=487801
Log:
QPID-202 : Implement TemporaryQueue.delete

Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Dec 16 05:36:26 2006
@@ -48,6 +48,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
@@ -102,6 +103,11 @@
     private Map _consumers = new ConcurrentHashMap();
 
     /**
+     * Maps from destination to count of JMSMessageConsumers
+     */
+    private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+            new ConcurrentHashMap<Destination, AtomicInteger>();
+    /**
      * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
      * need to be attached to a queue
      */
@@ -127,6 +133,8 @@
      */
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
+
+
     /**
      * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
      */
@@ -788,20 +796,38 @@
     public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
+        return createConsumerImpl(destination,
+                                  _defaultPrefetchHighMark,
+                                  _defaultPrefetchLowMark,
+                                  false,
+                                  false,
+                                  null,
+                                  null);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
+        return createConsumerImpl(destination,
+                                  _defaultPrefetchHighMark,
+                                  _defaultPrefetchLowMark,
+                                  false,
+                                  false,
+                                  messageSelector,
+                                  null);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
+        return createConsumerImpl(destination,
+                                  _defaultPrefetchHighMark,
+                                  _defaultPrefetchLowMark,
+                                  noLocal,
+                                  false,
+                                  messageSelector,
+                                  null);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -811,7 +837,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
     }
 
 
@@ -823,7 +849,7 @@
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
-        return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -892,11 +918,26 @@
                     throw ex;
                 }
 
+                synchronized(destination)
+                {
+                    _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+                    _destinationConsumerCount.get(destination).incrementAndGet();
+                }
+
                 return consumer;
             }
         }.execute(_connection);
     }
 
+
+    public boolean hasConsumer(TemporaryQueue destination)
+    {
+        AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+        return (counter != null) && (counter.get() != 0);
+    }
+
+
     public void declareExchange(String name, String type)
     {
         declareExchange(name, type, _connection.getProtocolHandler());
@@ -970,6 +1011,7 @@
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
+
         try
         {
             AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
@@ -1136,7 +1178,7 @@
             if (isQueueBound(dest.getQueueName()) &&
                 !isQueueBound(dest.getQueueName(), topic.getTopicName()))
             {
-                deleteSubscriptionQueue(dest.getQueueName());
+                deleteQueue(dest.getQueueName());
             }
         }
 
@@ -1146,7 +1188,7 @@
         return subscriber;
     }
 
-    private void deleteSubscriptionQueue(String queueName) throws JMSException
+    void deleteQueue(String queueName) throws JMSException
     {
         try
         {
@@ -1198,7 +1240,7 @@
     public TemporaryQueue createTemporaryQueue() throws JMSException
     {
         checkNotClosed();
-        return new AMQTemporaryQueue();
+        return new AMQTemporaryQueue(this);
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -1214,14 +1256,14 @@
         if (subscriber != null)
         {
             // send a queue.delete for the subscription
-            deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+            deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             _subscriptions.remove(name);
         }
         else
         {
             if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
             {
-                deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+                deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             }
             else
             {
@@ -1230,12 +1272,12 @@
         }
     }
 
-    private boolean isQueueBound(String queueName) throws JMSException
+    boolean isQueueBound(String queueName) throws JMSException
     {
         return isQueueBound(queueName, null);
     }
 
-    private boolean isQueueBound(String queueName, String routingKey) throws JMSException
+    boolean isQueueBound(String queueName, String routingKey) throws JMSException
     {
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
                                                                routingKey, queueName);
@@ -1374,11 +1416,19 @@
      * Called by the MessageConsumer when closing, to deregister the consumer from the
      * map from consumerTag to consumer instance.
      *
-     * @param consumerTag the consumer tag, that was broker-generated
+     * @param consumer the consum
      */
-    void deregisterConsumer(String consumerTag)
+    void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        _consumers.remove(consumerTag);
+        _consumers.remove(consumer.getConsumerTag());
+        Destination dest = consumer.getDestination();
+        synchronized(dest)
+        {
+            if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            {
+                _destinationConsumerCount.remove(dest);
+            }
+        }
     }
 
     private void registerProducer(long producerId, MessageProducer producer)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Sat Dec 16 05:36:26 2006
@@ -29,22 +29,32 @@
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
 {
 
+    private final AMQSession _session;
 
     /**
      * Create a new instance of an AMQTemporaryQueue
      */
-    public AMQTemporaryQueue()
+    public AMQTemporaryQueue(AMQSession session)
     {
         super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+        _session = session;
     }
 
     /**
      * @see javax.jms.TemporaryQueue#delete()
      */
-    public void delete() throws JMSException
+    public synchronized void delete() throws JMSException
     {
-        throw new UnsupportedOperationException("Delete not supported, " +
-                                                "will auto-delete when connection closed");
+        if(_session.hasConsumer(this))
+        {
+            throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+        }
+
+        if(_session.isQueueBound(getQueueName()))
+        {
+            _session.deleteQueue(getQueueName());
+        }
+
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sat Dec 16 05:36:26 2006
@@ -524,7 +524,7 @@
      */
     private void deregisterConsumer()
     {
-    	_session.deregisterConsumer(_consumerTag);
+    	_session.deregisterConsumer(this);
     }
 
     public String getConsumerTag()

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?view=auto&rev=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Sat Dec 16 05:36:26 2006
@@ -0,0 +1,81 @@
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.unit.client.connection.ConnectionTest;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+    String _broker = "vm://:1";
+
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        TransportConnection.killAllVMBrokers();
+    }
+
+    protected Connection createConnection() throws AMQException, URLSyntaxException
+    {
+        return new AMQConnection(_broker, "guest", "guest",
+                                                      "fred", "/test");
+    }
+
+    public void testTempoaryQueue() throws Exception
+    {
+        Connection conn = createConnection();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = session.createTemporaryQueue();
+        assertNotNull(queue);
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+        conn.start();
+        producer.send(session.createTextMessage("hello"));
+        TextMessage tm = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm);
+        assertEquals("hello",tm.getText());
+
+        try
+        {
+            queue.delete();
+            fail("Expected JMSException : should not be able to delete while there are active consumers");
+        }
+        catch(JMSException je)
+        {
+            ; //pass
+        }
+
+        consumer.close();
+
+        try
+        {
+            queue.delete();
+        }
+        catch(JMSException je)
+        {
+            fail("Unexpected Exception: " + je.getMessage());
+        }
+
+        conn.close();
+    }
+
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(TemporaryQueueTest.class);
+    }
+}