You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by tm...@apache.org on 2013/06/05 13:51:42 UTC
svn commit: r1489823 -
/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
Author: tmielke
Date: Wed Jun 5 11:51:41 2013
New Revision: 1489823
URL: http://svn.apache.org/r1489823
Log:
AMQ-4571: JUnit test added
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java?rev=1489823&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java Wed Jun 5 11:51:41 2013
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test for AMQ-4571.
+ * checks that durable subscription is fully unregistered
+ * when using nested destination interceptors.
+ */
+public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class);
+ private MBeanServerConnection mbsc = null;
+ public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=";
+
+ /**
+ * Tests AMQ-4571.
+ * @throws Exception
+ */
+ public void testVirtualTopicRemoval() throws Exception {
+
+ LOG.debug("Running testVirtualTopicRemoval()");
+ String clientId1 = "myId1";
+ String clientId2 = "myId2";
+
+ Connection conn = null;
+ Session session = null;
+
+ try {
+ assertTrue(broker.isStarted());
+
+ // create durable sub 1
+ conn = createConnection();
+ conn.setClientID(clientId1);
+ conn.start();
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ // Topic topic = session.createTopic(destination.getPhysicalName());
+ TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1);
+
+ // create durable sub 2
+ TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2);
+
+ // verify two subs registered in JMX
+ assertSubscriptionCount(destination.getPhysicalName(), 2);
+ assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+ assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+
+ // delete sub 1
+ sub1.close();
+ session.unsubscribe(clientId1);
+
+ // verify only one sub registered in JMX
+ assertSubscriptionCount(destination.getPhysicalName(), 1);
+ assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+ assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+
+ // delete sub 2
+ sub2.close();
+ session.unsubscribe(clientId2);
+
+ // verify no sub registered in JMX
+ assertSubscriptionCount(destination.getPhysicalName(), 0);
+ assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+ assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+ } finally {
+ session.close();
+ conn.close();
+ }
+ }
+
+
+ /**
+ * Connects to broker using JMX
+ * @return The JMX connection
+ * @throws IOException in case of any errors
+ */
+ protected MBeanServerConnection connectJMXBroker() throws IOException {
+ // connect to broker via JMX
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi");
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ LOG.debug("JMX connection established");
+ return mbsc;
+ }
+
+ /**
+ * Asserts that the Subscriptions JMX attribute of a topic has the expected
+ * count.
+ * @param topicName name of the topic destination
+ * @param expectedCount expected number of subscriptions
+ * @return
+ */
+ protected boolean assertSubscriptionCount(String topicName, int expectedCount) {
+ try {
+ if (mbsc == null) {
+ mbsc = connectJMXBroker();
+ }
+ // query broker queue size
+ ObjectName[] tmp = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
+ assertEquals(expectedCount, tmp.length);
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Checks if a subscriptions for topic topicName with subName is registered in JMX
+ *
+ * @param topicName physical name of topic destination (excluding prefix 'topic://')
+ * @param subName name of the durable subscription
+ * @return true if registered, false otherwise
+ */
+ protected boolean isSubRegisteredInJmx(String topicName, String subName) {
+
+ try {
+ if (mbsc == null) {
+ mbsc = connectJMXBroker();
+ }
+
+ // A durable sub is registered under the Subscriptions JMX attribute of the topic and
+ // as its own ObjectInstance under the topic's Consumer namespace.
+ // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need
+ // to check against both.
+ ObjectName[] names = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
+ ObjectInstance instance = (ObjectInstance)mbsc.getObjectInstance(
+ new ObjectName(JMX_CONTEXT_BASE_NAME +
+ topicName +
+ ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" +
+ subName +
+ ")")
+ );
+
+ if (instance == null)
+ return false;
+
+ for (int i=0; i < names.length; i++) {
+ if (names[i].toString().contains(subName))
+ return true;
+ }
+ } catch (InstanceNotFoundException ine) {
+ //this may be expected so log at info level
+ LOG.info(ine.toString());
+ return false;
+ }
+ catch (Exception ex) {
+ LOG.error(ex.toString());
+ return false;
+ }
+ return false;
+ }
+
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+
+ protected BrokerService createBroker() throws Exception {
+ XBeanBrokerFactory factory = new XBeanBrokerFactory();
+ BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+
+ // lets disable persistence as we are a test
+ answer.setPersistent(false);
+ useTopic = true;
+ return answer;
+ }
+
+
+ protected String getBrokerConfigUri() {
+ return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml";
+ }
+
+
+ /**
+ * Simple but custom topic interceptor.
+ * To be used for testing nested interceptors in conjunction with
+ * virtual topic interceptor.
+ */
+ public static class SimpleDestinationInterceptor implements DestinationInterceptor {
+
+ private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class);
+ private BrokerService broker;
+
+ public SimpleDestinationInterceptor() {
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+ */
+ public void setBrokerService(BrokerService brokerService) {
+ LOG.info("setBrokerService()");
+ this.broker = brokerService;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination)
+ */
+ public Destination intercept(final Destination destination) {
+ LOG.info("intercept({})", destination.getName());
+
+ if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) {
+ return new DestinationFilter(destination) {
+ public void send(ProducerBrokerExchange context, Message message) throws Exception {
+ // Send message to Destination
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SimpleDestinationInterceptor: Sending message to destination:"
+ + this.getActiveMQDestination().getPhysicalName());
+ }
+ // message.setDestination(destination.getActiveMQDestination());
+ super.send(context, message);
+ }
+ };
+ }
+ return destination;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination)
+ */
+ public void remove(Destination destination) {
+ LOG.info("remove({})", destination.getName());
+ this.broker = null;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination)
+ */
+ public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
+ LOG.info("create("+ broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName());
+ }
+ }
+}