You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/09/13 21:10:46 UTC
svn commit: r996646 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/RegionBroker.java
test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java
Author: tabish
Date: Mon Sep 13 19:10:46 2010
New Revision: 996646
URL: http://svn.apache.org/viewvc?rev=996646&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2915
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=996646&r1=996645&r2=996646&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Sep 13 19:10:46 2010
@@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
@@ -320,9 +321,11 @@ public class RegionBroker extends EmptyB
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeDestination(context, destination, timeout);
+ removeAdvisoryTopics("Queue.", context, destination, timeout);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeDestination(context, destination, timeout);
+ removeAdvisoryTopics("Topic.", context, destination, timeout);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeDestination(context, destination, timeout);
@@ -334,10 +337,30 @@ public class RegionBroker extends EmptyB
throw createUnknownDestinationTypeException(destination);
}
destinations.remove(destination);
+
}
}
+ public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
+ if (this.brokerService.isAdvisorySupport()) {
+ String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
+ String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
+
+ ActiveMQDestination dests[] = getDestinations();
+ for (ActiveMQDestination dest: dests) {
+ String name = dest.getPhysicalName();
+ if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) {
+ try {
+ removeDestination(context, dest, timeout);
+ } catch (JMSException ignore) {
+ // at least ignore the Unknown Destination Type JMSException
+ }
+ }
+ }
+ }
+ }
+
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
addDestination(context, info.getDestination(),true);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java?rev=996646&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java Mon Sep 13 19:10:46 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AdvisoryTopicDeletionTest extends TestSupport {
+ private static final Log LOG = LogFactory.getLog(AdvisoryTopicDeletionTest.class);
+
+ private BrokerService broker;
+ private Connection connection;
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://" + getName());
+ }
+
+ protected void setUp() throws Exception {
+ createBroker();
+ topic = false;
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ destroyBroker();
+ }
+
+ private void createBroker() throws Exception {
+ broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+ broker.setPersistent(false);
+ broker.setBrokerName(getName());
+ broker.start();
+
+ connection = createConnection();
+ }
+
+ @Override
+ protected Connection createConnection() throws Exception {
+ Connection con = super.createConnection();
+ con.start();
+ return con;
+ }
+
+ private void destroyBroker() throws Exception {
+ if (connection != null)
+ connection.close();
+ if (broker != null)
+ broker.stop();
+ }
+
+ public void doTest() throws Exception {
+ Destination dest = createDestination();
+
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = consumerSession.createConsumer(dest);
+
+ MessageProducer prod = producerSession.createProducer(dest);
+ Message message = producerSession.createMessage();
+ prod.send(message);
+
+ consumer.receive(60 * 1000);
+ connection.close();
+ connection = null;
+
+ if ( topic ) {
+ broker.getAdminView().removeTopic(((ActiveMQDestination)dest).getPhysicalName());
+ } else {
+ broker.getAdminView().removeQueue(((ActiveMQDestination)dest).getPhysicalName());
+ }
+
+ ActiveMQDestination dests[] = broker.getRegionBroker().getDestinations();
+ int matchingDestinations = 0;
+ for (ActiveMQDestination destination: dests) {
+ String name = destination.getPhysicalName();
+ LOG.debug("Found destination " + name);
+ if (name.startsWith("ActiveMQ.Advisory") && name.contains(getDestinationString())) {
+ matchingDestinations++;
+ }
+ }
+
+ assertEquals("No matching destinations should be found", 0, matchingDestinations);
+ }
+
+ public void testTopic() throws Exception {
+ topic=true;
+ doTest();
+ }
+
+ public void testQueue() throws Exception {
+ topic=false;
+ doTest();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicDeletionTest.java
------------------------------------------------------------------------------
svn:eol-style = native