You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/27 21:11:40 UTC

svn commit: r397613 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/act...

Author: chirino
Date: Thu Apr 27 12:11:38 2006
New Revision: 397613

URL: http://svn.apache.org/viewcvs?rev=397613&view=rev
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-695

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 27 12:11:38 2006
@@ -1555,7 +1555,7 @@
         info.setConnectionId(this.info.getConnectionId());
         info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
         info.setDestination(destination);
-        info.setTimeout(1000*5);
+        info.setTimeout(0);
         syncSendPacket(info);
     }
 
@@ -1590,7 +1590,7 @@
         info.setConnectionId(this.info.getConnectionId());
         info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
         info.setDestination(destination);
-        info.setTimeout(1000*5);
+        info.setTimeout(0);
         syncSendPacket(info);
 
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu Apr 27 12:11:38 2006
@@ -147,20 +147,6 @@
         return answer;
     }
     
-    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
-        next.removeDestination(context, destination, timeout);
-        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
-        DestinationInfo info = (DestinationInfo) destinations.remove(destination);
-        if( info !=null && info.getDestination() != null && topic != null) {
-            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
-            fireAdvisory(context, topic, info);
-            next.removeDestination(context,topic,timeout);
-            next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout); 
-            next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), timeout); 
-        }
-       
-    }
-    
     public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
         ActiveMQDestination destination =  info.getDestination();
         next.addDestinationInfo(context, info);  
@@ -170,18 +156,45 @@
         destinations.put(destination, info);    
     }
 
-    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
-        next.removeDestinationInfo(context, info);
-        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
-        fireAdvisory(context, topic, info);
-        try {
-            next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), 0);
-        } catch (Exception expectedIfDestinationDidNotExistYet) {
+    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
+        next.removeDestination(context, destination, timeout);
+        DestinationInfo info = (DestinationInfo) destinations.remove(destination);
+        if( info !=null ) {
+            info.setDestination(destination);
+            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
+            fireAdvisory(context, topic, info);
+            try {
+                next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
+            } catch (Exception expectedIfDestinationDidNotExistYet) {
+            }
+            try {
+                next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
+            } catch (Exception expectedIfDestinationDidNotExistYet) {
+            }
         }
-        try {
-            next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), 0); 
-        } catch (Exception expectedIfDestinationDidNotExistYet) {
+       
+    }
+    
+    public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception{
+        next.removeDestinationInfo(context, destInfo);
+        DestinationInfo info = (DestinationInfo) destinations.remove(destInfo.getDestination());
+
+        if( info !=null ) {
+            info.setDestination(destInfo.getDestination());
+            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
+            fireAdvisory(context, topic, info);
+            try {
+                next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
+            } catch (Exception expectedIfDestinationDidNotExistYet) {
+            }
+            try {
+                next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
+            } catch (Exception expectedIfDestinationDidNotExistYet) {
+            }
         }
+
     }
 
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Apr 27 12:11:38 2006
@@ -17,12 +17,8 @@
 package org.apache.activemq.broker.jmx;
 
 import javax.management.ObjectName;
+
 import org.apache.activemq.Service;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
 
 public interface BrokerViewMBean extends Service {
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Apr 27 12:11:38 2006
@@ -23,7 +23,6 @@
 
 import javax.jms.JMSException;
 
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
@@ -94,20 +93,41 @@
 
     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
                     throws Exception{
-        // The destination cannot be removed if there are any active subscriptions
-        for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
-            Subscription sub=(Subscription) iter.next();
-            if(sub.matches(destination)){
-                throw new JMSException("Destination still has an active subscription: "+destination);
+        
+        // No timeout.. then try to shut down right way, fails if there are current subscribers.
+        if( timeout == 0 ) {
+            for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+                Subscription sub=(Subscription) iter.next();
+                if(sub.matches(destination)){
+                    throw new JMSException("Destination still has an active subscription: "+destination);
+                }
             }
         }
+        
+        if( timeout > 0 ) {
+            // TODO: implement a way to notify the subscribers that we want to take the down 
+            // the destination and that they should un-subscribe..  Then wait up to timeout time before
+            // dropping the subscription.
+        
+        }
+
         log.debug("Removing destination: "+destination);
         synchronized(destinationsMutex){
             Destination dest=(Destination) destinations.remove(destination);
             if(dest!=null){
+                
+                // timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe.
+                for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+                    Subscription sub=(Subscription) iter.next();
+                    if(sub.matches(destination)){
+                        dest.removeSubscription(context, sub);
+                    }
+                }
+                
                 destinationMap.removeAll(destination);
                 dest.dispose(context);
                 dest.stop();
+                
             }else{
                 log.debug("Destination doesn't exist: " + dest);
             }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java?rev=397613&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java Thu Apr 27 12:11:38 2006
@@ -0,0 +1,147 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ * 
+ * @version $Revision: 397249 $
+ */
+public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
+
+    protected int consumerCounter;
+    protected ConsumerEventSource topicConsumerEventSource;
+    private ConsumerEventSource queueConsumerEventSource;
+
+    protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
+    private Connection connection;
+    private Session session;
+    private ActiveMQTempTopic tempTopic;
+    private ActiveMQTempQueue tempQueue;
+
+    public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception {
+        topicConsumerEventSource.start();
+
+        MessageConsumer consumer = createConsumer(tempTopic);
+        assertConsumerEvent(1, true);
+
+        Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic);
+        assertTrue( destinationExists(advisoryTopic) );
+        
+        consumer.close();
+        
+        // Once we delete the topic, the advisory topic for the destination should also be deleted.
+        tempTopic.delete();
+        
+        assertFalse( destinationExists(advisoryTopic) );
+    }
+
+    public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception {
+        queueConsumerEventSource.start();
+
+        MessageConsumer consumer = createConsumer(tempQueue);
+        assertConsumerEvent(1, true);
+
+        Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue);
+        assertTrue( destinationExists(advisoryTopic) );
+        
+        consumer.close();
+        
+        // Once we delete the queue, the advisory topic for the destination should also be deleted.
+        tempQueue.delete();
+        
+        assertFalse( destinationExists(advisoryTopic) );
+    }
+
+    private boolean destinationExists(Destination dest) throws Exception {
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
+        return rb.getTopicRegion().getDestinationMap().containsKey(dest)
+                || rb.getQueueRegion().getDestinationMap().containsKey(dest)
+                || rb.getTempTopicRegion().getDestinationMap().containsKey(dest)
+                || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
+    }
+
+    public void onConsumerEvent(ConsumerEvent event) {
+        eventQueue.add(event);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+        connection.start();
+        
+        session = connection.createSession(false, 0);
+        
+        tempTopic = (ActiveMQTempTopic) session.createTemporaryTopic();
+        topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic);
+        topicConsumerEventSource.setConsumerListener(this);
+    
+        tempQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
+        queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue);
+        queueConsumerEventSource.setConsumerListener(this);
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+        ConsumerEvent event = waitForConsumerEvent();
+        assertEquals("Consumer count", count, event.getConsumerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected MessageConsumer createConsumer(Destination dest) throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        log.info("Creating consumer: " + consumerText + " on destination: " + dest);
+        
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                log.info("Received message by: " + consumerText + " message: " + message);
+            }
+        });
+        return consumer;
+    }
+
+    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
+        ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}