You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/02 21:59:18 UTC

svn commit: r1428029 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java

Author: tabish
Date: Wed Jan  2 20:59:18 2013
New Revision: 1428029

URL: http://svn.apache.org/viewvc?rev=1428029&view=rev
Log:
fix test cases after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=1428029&r1=1428028&r2=1428029&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java Wed Jan  2 20:59:18 2013
@@ -17,11 +17,8 @@
 package org.apache.activemq.network;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeNotNull;
 
 import java.io.File;
-import java.net.MalformedURLException;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -31,38 +28,35 @@ import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.TopicSubscriber;
-import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NetworkBrokerDetachTest {
 
-	private final static String BROKER_NAME = "broker";
-	private final static String REM_BROKER_NAME = "networkedBroker";
-	private final static String DESTINATION_NAME = "testQ";
-	private final static int    NUM_CONSUMERS = 1;
-	
+    private final static String BROKER_NAME = "broker";
+    private final static String REM_BROKER_NAME = "networkedBroker";
+    private final static String DESTINATION_NAME = "testQ";
+    private final static int NUM_CONSUMERS = 1;
+
     protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
     protected final int numRestarts = 3;
     protected final int networkTTL = 2;
     protected final boolean dynamicOnly = false;
-    
+
     protected BrokerService broker;
     protected BrokerService networkedBroker;
 
@@ -86,42 +80,36 @@ public class NetworkBrokerDetachTest {
         configureNetworkConnector(networkConnector);
         return broker;
     }
-    
+
     private void configureNetworkConnector(NetworkConnector networkConnector) {
         networkConnector.setDuplex(false);
         networkConnector.setNetworkTTL(networkTTL);
         networkConnector.setDynamicOnly(dynamicOnly);
     }
-    
+
     // variants for each store....
     protected void configureBroker(BrokerService broker) throws Exception {
-        //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
-        //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
-        //broker.setPersistenceAdapter(persistenceAdapter);        
-        
         KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
         persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
         broker.setPersistenceAdapter(persistenceAdapter);
-        
-        // default AMQ
     }
-    
+
     @Before
     public void init() throws Exception {
         broker = createBroker();
         broker.setDeleteAllMessagesOnStartup(true);
         broker.start();
-        
+
         networkedBroker = createNetworkedBroker();
         networkedBroker.setDeleteAllMessagesOnStartup(true);
         networkedBroker.start();
     }
-    
+
     @After
     public void cleanup() throws Exception {
         networkedBroker.stop();
         networkedBroker.waitUntilStopped();
-        
+
         broker.stop();
         broker.waitUntilStopped();
     }
@@ -129,7 +117,7 @@ public class NetworkBrokerDetachTest {
     @Test
     public void testNetworkedBrokerDetach() throws Exception {
         LOG.info("Creating Consumer on the networked broker ...");
-        // Create a consumer on the networked broker 
+        // Create a consumer on the networked broker
         ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
         Connection consConn = consFactory.createConnection();
         Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -137,64 +125,63 @@ public class NetworkBrokerDetachTest {
         for(int i=0; i<NUM_CONSUMERS; i++) {
             consSession.createConsumer(destination);
         }
-        
-        assertTrue("got expected consumer count from mbean within time limit", 
-                verifyConsumerCount(1, destination, broker));
-        
-        
+
+        assertTrue("got expected consumer count from mbean within time limit",
+                   verifyConsumerCount(1, destination, broker));
+
         LOG.info("Stopping Consumer on the networked broker ...");
-        // Closing the connection will also close the consumer 
+        // Closing the connection will also close the consumer
         consConn.close();
-        
+
         // We should have 0 consumer for the queue on the local broker
         assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker));
     }
 
-    
     @Test
     public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
-        
+
         final AtomicInteger count = new AtomicInteger(0);
         MessageListener counter = new MessageListener() {
+            @Override
             public void onMessage(Message message) {
                 count.incrementAndGet();
             }
         };
-        
+
         LOG.info("Creating durable consumer on each broker ...");
         ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
         registerDurableConsumer(broker, counter);
-        
+
         assertTrue("got expected consumer count from local broker mbean within time limit",
                 verifyConsumerCount(2, destination, broker));
-        
+
         assertTrue("got expected consumer count from network broker mbean within time limit",
                 verifyConsumerCount(2, destination, networkedBroker));
-        
+
         sendMessageTo(destination, broker);
-        
+
         assertTrue("Got one message on each", verifyMessageCount(2, count));
-        
+
         LOG.info("Stopping brokerTwo...");
         networkedBroker.stop();
-        networkedBroker.waitUntilStopped();           
-        
+        networkedBroker.waitUntilStopped();
+
         LOG.info("restarting  broker Two...");
         networkedBroker = createNetworkedBroker();
         networkedBroker.start();
-   
+
         LOG.info("Recreating durable Consumer on the broker after restart...");
         registerDurableConsumer(networkedBroker, counter);
-        
+
         // give advisories a chance to percolate
         TimeUnit.SECONDS.sleep(5);
-        
+
         sendMessageTo(destination, broker);
-        
+
         // expect similar after restart
         assertTrue("got expected consumer count from local broker mbean within time limit",
                 verifyConsumerCount(2, destination, broker));
- 
+
         // a durable sub is auto bridged on restart unless dynamicOnly=true
         assertTrue("got expected consumer count from network broker mbean within time limit",
                 verifyConsumerCount(2, destination, networkedBroker));
@@ -209,9 +196,10 @@ public class NetworkBrokerDetachTest {
 
     private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 return i == count.get();
-            }      
+            }
         });
     }
 
@@ -237,10 +225,9 @@ public class NetworkBrokerDetachTest {
         session.createProducer(destination).send(session.createTextMessage("Hi"));
         conn.close();
     }
-    
+
     protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
-        
-        String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
+        String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
         connectionFactory.setOptimizedMessageDispatch(true);
         connectionFactory.setCopyMessageOnSend(false);
@@ -256,70 +243,61 @@ public class NetworkBrokerDetachTest {
         connectionFactory.setAlwaysSyncSend(true);
         return connectionFactory;
     }
-    
-    // JMX Helper Methods 
+
+    // JMX Helper Methods
     private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 boolean result = false;
                 try {
+
+                    ObjectName[] destinations;
+
+                    if (destination.isQueue()) {
+                        destinations = broker.getAdminView().getQueues();
+                    } else {
+                        destinations = broker.getAdminView().getTopics();
+                    }
+
                     // We should have 1 consumer for the queue on the local broker
-                    Object consumers = broker.getManagementContext().getAttribute(getObjectName(broker.getBrokerName(), destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName()), "ConsumerCount");
-                    if (consumers != null) {
-                        LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + consumers);
-                        if (expectedCount == ((Long)consumers).longValue()) {
-                            result = true;
+                    for (ObjectName name : destinations) {
+                        DestinationViewMBean view = (DestinationViewMBean)
+                            broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
+
+                        if (view.getName().equals(destination.getPhysicalName())) {
+                            LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + view.getConsumerCount());
+                            if (expectedCount == view.getConsumerCount()) {
+                                result = true;
+                            }
                         }
                     }
+
                 } catch (Exception ignoreAndRetry) {
                 }
                 return result;
-            }      
+            }
         });
     }
-    
-    
+
     private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
+            @Override
             public boolean isSatisified() throws Exception {
                 boolean result = false;
-                MBeanServerConnection mbsc = getMBeanServerConnection();
-                if (mbsc != null) {
-                    Set subs = broker.getManagementContext().queryNames(getObjectName(broker.getBrokerName(), "Subscription", "active=false,*"), null);
+                BrokerView view = broker.getAdminView();
+
+                if (view != null) {
+                    ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
                     if (subs != null) {
                         LOG.info("inactive durable subs on " + broker + " : " + subs);
-                        if (expectedCount == subs.size()) {
+                        if (expectedCount == subs.length) {
                             result = true;
                         }
                     }
                 }
                 return result;
-            }      
+            }
         });
     }
-
-    private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
-        final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
-        MBeanServerConnection mbsc = null;
-        try {
-            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
-            mbsc = jmxc.getMBeanServerConnection();
-        } catch (Exception ignored) {
-            LOG.warn("getMBeanServer ex: " + ignored);
-        }
-        // If port 1099 is in use when the Broker starts, starting the jmx
-        // connector will fail.  So, if we have no mbsc to query, skip the
-        // test.
-        assumeNotNull(mbsc);
-        return mbsc;
-    }
-    
-    
-    private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
-      ObjectName beanName = new ObjectName(
-        "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
-      );
-      
-      return beanName;
-    }
 }