You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/16 14:36:27 UTC
svn commit: r487801 - in /incubator/qpid/trunk/qpid/java/client/src:
main/java/org/apache/qpid/client/
test/java/org/apache/qpid/test/unit/client/temporaryqueue/
Author: rgreig
Date: Sat Dec 16 05:36:26 2006
New Revision: 487801
URL: http://svn.apache.org/viewvc?view=rev&rev=487801
Log:
QPID-202 : Implement TemporaryQueue.delete
Added:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Dec 16 05:36:26 2006
@@ -48,6 +48,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -102,6 +103,11 @@
private Map _consumers = new ConcurrentHashMap();
/**
+ * Maps from destination to count of JMSMessageConsumers
+ */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
+ /**
* Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
* need to be attached to a queue
*/
@@ -127,6 +133,8 @@
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -788,20 +796,38 @@
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ null,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -811,7 +837,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -823,7 +849,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -892,11 +918,26 @@
throw ex;
}
+ synchronized(destination)
+ {
+ _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.get(destination).incrementAndGet();
+ }
+
return consumer;
}
}.execute(_connection);
}
+
+ public boolean hasConsumer(TemporaryQueue destination)
+ {
+ AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+ return (counter != null) && (counter.get() != 0);
+ }
+
+
public void declareExchange(String name, String type)
{
declareExchange(name, type, _connection.getProtocolHandler());
@@ -970,6 +1011,7 @@
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
+
try
{
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
@@ -1136,7 +1178,7 @@
if (isQueueBound(dest.getQueueName()) &&
!isQueueBound(dest.getQueueName(), topic.getTopicName()))
{
- deleteSubscriptionQueue(dest.getQueueName());
+ deleteQueue(dest.getQueueName());
}
}
@@ -1146,7 +1188,7 @@
return subscriber;
}
- private void deleteSubscriptionQueue(String queueName) throws JMSException
+ void deleteQueue(String queueName) throws JMSException
{
try
{
@@ -1198,7 +1240,7 @@
public TemporaryQueue createTemporaryQueue() throws JMSException
{
checkNotClosed();
- return new AMQTemporaryQueue();
+ return new AMQTemporaryQueue(this);
}
public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -1214,14 +1256,14 @@
if (subscriber != null)
{
// send a queue.delete for the subscription
- deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
_subscriptions.remove(name);
}
else
{
if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
{
- deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else
{
@@ -1230,12 +1272,12 @@
}
}
- private boolean isQueueBound(String queueName) throws JMSException
+ boolean isQueueBound(String queueName) throws JMSException
{
return isQueueBound(queueName, null);
}
- private boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
routingKey, queueName);
@@ -1374,11 +1416,19 @@
* Called by the MessageConsumer when closing, to deregister the consumer from the
* map from consumerTag to consumer instance.
*
- * @param consumerTag the consumer tag, that was broker-generated
+ * @param consumer the consum
*/
- void deregisterConsumer(String consumerTag)
+ void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumerTag);
+ _consumers.remove(consumer.getConsumerTag());
+ Destination dest = consumer.getDestination();
+ synchronized(dest)
+ {
+ if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
+ }
}
private void registerProducer(long producerId, MessageProducer producer)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Sat Dec 16 05:36:26 2006
@@ -29,22 +29,32 @@
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
{
+ private final AMQSession _session;
/**
* Create a new instance of an AMQTemporaryQueue
*/
- public AMQTemporaryQueue()
+ public AMQTemporaryQueue(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ _session = session;
}
/**
* @see javax.jms.TemporaryQueue#delete()
*/
- public void delete() throws JMSException
+ public synchronized void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+ }
+
+ if(_session.isQueueBound(getQueueName()))
+ {
+ _session.deleteQueue(getQueueName());
+ }
+
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487801&r1=487800&r2=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sat Dec 16 05:36:26 2006
@@ -524,7 +524,7 @@
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(this);
}
public String getConsumerTag()
Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?view=auto&rev=487801
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Sat Dec 16 05:36:26 2006
@@ -0,0 +1,81 @@
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.unit.client.connection.ConnectionTest;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ protected Connection createConnection() throws AMQException, URLSyntaxException
+ {
+ return new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}