You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/27 21:11:40 UTC
svn commit: r397613 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/ test/java/org/apache/act...
Author: chirino
Date: Thu Apr 27 12:11:38 2006
New Revision: 397613
URL: http://svn.apache.org/viewcvs?rev=397613&view=rev
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-695
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 27 12:11:38 2006
@@ -1555,7 +1555,7 @@
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
- info.setTimeout(1000*5);
+ info.setTimeout(0);
syncSendPacket(info);
}
@@ -1590,7 +1590,7 @@
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
- info.setTimeout(1000*5);
+ info.setTimeout(0);
syncSendPacket(info);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu Apr 27 12:11:38 2006
@@ -147,20 +147,6 @@
return answer;
}
- public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
- next.removeDestination(context, destination, timeout);
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
- DestinationInfo info = (DestinationInfo) destinations.remove(destination);
- if( info !=null && info.getDestination() != null && topic != null) {
- info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
- fireAdvisory(context, topic, info);
- next.removeDestination(context,topic,timeout);
- next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
- next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), timeout);
- }
-
- }
-
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
ActiveMQDestination destination = info.getDestination();
next.addDestinationInfo(context, info);
@@ -170,18 +156,45 @@
destinations.put(destination, info);
}
- public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
- next.removeDestinationInfo(context, info);
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
- fireAdvisory(context, topic, info);
- try {
- next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), 0);
- } catch (Exception expectedIfDestinationDidNotExistYet) {
+ public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
+ next.removeDestination(context, destination, timeout);
+ DestinationInfo info = (DestinationInfo) destinations.remove(destination);
+ if( info !=null ) {
+ info.setDestination(destination);
+ info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+ ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
+ fireAdvisory(context, topic, info);
+ try {
+ next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
+ } catch (Exception expectedIfDestinationDidNotExistYet) {
+ }
+ try {
+ next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
+ } catch (Exception expectedIfDestinationDidNotExistYet) {
+ }
}
- try {
- next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), 0);
- } catch (Exception expectedIfDestinationDidNotExistYet) {
+
+ }
+
+ public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception{
+ next.removeDestinationInfo(context, destInfo);
+ DestinationInfo info = (DestinationInfo) destinations.remove(destInfo.getDestination());
+
+ if( info !=null ) {
+ info.setDestination(destInfo.getDestination());
+ info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+ ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
+ fireAdvisory(context, topic, info);
+ try {
+ next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
+ } catch (Exception expectedIfDestinationDidNotExistYet) {
+ }
+ try {
+ next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
+ } catch (Exception expectedIfDestinationDidNotExistYet) {
+ }
}
+
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Thu Apr 27 12:11:38 2006
@@ -17,12 +17,8 @@
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
+
import org.apache.activemq.Service;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
public interface BrokerViewMBean extends Service {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=397613&r1=397612&r2=397613&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Apr 27 12:11:38 2006
@@ -23,7 +23,6 @@
import javax.jms.JMSException;
-import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
@@ -94,20 +93,41 @@
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
- // The destination cannot be removed if there are any active subscriptions
- for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
- Subscription sub=(Subscription) iter.next();
- if(sub.matches(destination)){
- throw new JMSException("Destination still has an active subscription: "+destination);
+
+ // No timeout.. then try to shut down right way, fails if there are current subscribers.
+ if( timeout == 0 ) {
+ for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+ Subscription sub=(Subscription) iter.next();
+ if(sub.matches(destination)){
+ throw new JMSException("Destination still has an active subscription: "+destination);
+ }
}
}
+
+ if( timeout > 0 ) {
+ // TODO: implement a way to notify the subscribers that we want to take the down
+ // the destination and that they should un-subscribe.. Then wait up to timeout time before
+ // dropping the subscription.
+
+ }
+
log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination);
if(dest!=null){
+
+ // timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe.
+ for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
+ Subscription sub=(Subscription) iter.next();
+ if(sub.matches(destination)){
+ dest.removeSubscription(context, sub);
+ }
+ }
+
destinationMap.removeAll(destination);
dest.dispose(context);
dest.stop();
+
}else{
log.debug("Destination doesn't exist: " + dest);
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java?rev=397613&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java Thu Apr 27 12:11:38 2006
@@ -0,0 +1,147 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @version $Revision: 397249 $
+ */
+public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
+
+ protected int consumerCounter;
+ protected ConsumerEventSource topicConsumerEventSource;
+ private ConsumerEventSource queueConsumerEventSource;
+
+ protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
+ private Connection connection;
+ private Session session;
+ private ActiveMQTempTopic tempTopic;
+ private ActiveMQTempQueue tempQueue;
+
+ public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception {
+ topicConsumerEventSource.start();
+
+ MessageConsumer consumer = createConsumer(tempTopic);
+ assertConsumerEvent(1, true);
+
+ Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic);
+ assertTrue( destinationExists(advisoryTopic) );
+
+ consumer.close();
+
+ // Once we delete the topic, the advisory topic for the destination should also be deleted.
+ tempTopic.delete();
+
+ assertFalse( destinationExists(advisoryTopic) );
+ }
+
+ public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception {
+ queueConsumerEventSource.start();
+
+ MessageConsumer consumer = createConsumer(tempQueue);
+ assertConsumerEvent(1, true);
+
+ Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue);
+ assertTrue( destinationExists(advisoryTopic) );
+
+ consumer.close();
+
+ // Once we delete the queue, the advisory topic for the destination should also be deleted.
+ tempQueue.delete();
+
+ assertFalse( destinationExists(advisoryTopic) );
+ }
+
+ private boolean destinationExists(Destination dest) throws Exception {
+ RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
+ return rb.getTopicRegion().getDestinationMap().containsKey(dest)
+ || rb.getQueueRegion().getDestinationMap().containsKey(dest)
+ || rb.getTempTopicRegion().getDestinationMap().containsKey(dest)
+ || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
+ }
+
+ public void onConsumerEvent(ConsumerEvent event) {
+ eventQueue.add(event);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ connection.start();
+
+ session = connection.createSession(false, 0);
+
+ tempTopic = (ActiveMQTempTopic) session.createTemporaryTopic();
+ topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic);
+ topicConsumerEventSource.setConsumerListener(this);
+
+ tempQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
+ queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue);
+ queueConsumerEventSource.setConsumerListener(this);
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+
+ protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+ ConsumerEvent event = waitForConsumerEvent();
+ assertEquals("Consumer count", count, event.getConsumerCount());
+ assertEquals("started", started, event.isStarted());
+ }
+
+ protected MessageConsumer createConsumer(Destination dest) throws JMSException {
+ final String consumerText = "Consumer: " + (++consumerCounter);
+ log.info("Creating consumer: " + consumerText + " on destination: " + dest);
+
+ MessageConsumer consumer = session.createConsumer(dest);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ log.info("Received message by: " + consumerText + " message: " + message);
+ }
+ });
+ return consumer;
+ }
+
+ protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
+ ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+ assertTrue("Should have received a consumer event!", answer != null);
+ return answer;
+ }
+
+}