You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/09 23:36:17 UTC

svn commit: r1431123 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Author: tabish
Date: Wed Jan  9 22:36:17 2013
New Revision: 1431123

URL: http://svn.apache.org/viewvc?rev=1431123&view=rev
Log:
Fix another test that fails because of https://issues.apache.org/jira/browse/AMQ-4237

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1431123&r1=1431122&r2=1431123&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Wed Jan  9 22:36:17 2013
@@ -39,7 +39,6 @@ import javax.management.ObjectName;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerTestSupport;
 import org.apache.activemq.broker.StubConnection;
@@ -68,87 +67,87 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class duplicates most of the functionality in {@link NetworkTestSupport} 
- * and {@link BrokerTestSupport} because more control was needed over how brokers 
- * and connectors are created. Also, this test asserts message counts via JMX on 
- * each broker. 
- * 
- * @author bsnyder
- *
+ * This class duplicates most of the functionality in {@link NetworkTestSupport}
+ * and {@link BrokerTestSupport} because more control was needed over how brokers
+ * and connectors are created. Also, this test asserts message counts via JMX on
+ * each broker.
  */
 public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSupport*/ {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
-    
-    private BrokerService localBroker; 
-    private BrokerService remoteBroker; 
+
+    private BrokerService localBroker;
+    private BrokerService remoteBroker;
     private DemandForwardingBridge bridge;
-    
+
     protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
-    protected ArrayList connections = new ArrayList();
-    
+    protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+
     protected TransportConnector connector;
     protected TransportConnector remoteConnector;
-    
+
     protected long idGenerator;
     protected int msgIdGenerator;
     protected int tempDestGenerator;
     protected int maxWait = 4000;
     protected String queueName = "TEST";
-    
+
     protected String amqDomain = "org.apache.activemq";
-        
+
+    @Override
     protected void setUp() throws Exception {
-        
-        // For those who want visual confirmation: 
-        //   Uncomment the following to enable JMX support on a port number to use 
-        //   Jconsole to view each broker. You will need to add some calls to 
-        //   Thread.sleep() to be able to actually slow things down so that you 
-        //   can manually see JMX attrs. 
+
+        // For those who want visual confirmation:
+        //   Uncomment the following to enable JMX support on a port number to use
+        //   Jconsole to view each broker. You will need to add some calls to
+        //   Thread.sleep() to be able to actually slow things down so that you
+        //   can manually see JMX attrs.
 //        System.setProperty("com.sun.management.jmxremote", "");
 //        System.setProperty("com.sun.management.jmxremote.port", "1099");
 //        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
 //        System.setProperty("com.sun.management.jmxremote.ssl", "false");
-        
-        // Create the local broker 
+
+        // Create the local broker
         createBroker();
-        // Create the remote broker 
+        // Create the remote broker
         createRemoteBroker();
-        
+
         // Remove the activemq-data directory from the creation of the remote broker
         FileUtils.deleteDirectory(new File("activemq-data"));
-        
-        // Create a network bridge between the local and remote brokers so that 
+
+        // Create a network bridge between the local and remote brokers so that
         // demand-based forwarding can take place
         NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
         config.setBrokerName("local");
         config.setDispatchAsync(false);
         config.setDuplex(true);
 
-        Transport localTransport = createTransport(); 
+        Transport localTransport = createTransport();
         Transport remoteTransport = createRemoteTransport();
-        
-        // Create a network bridge between the two brokers 
+
+        // Create a network bridge between the two brokers
         bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
         bridge.setBrokerService(localBroker);
         bridge.start();
-        
+
         waitForBridgeFormation();
-        
+
     }
-    
+
     protected void waitForBridgeFormation() throws Exception {
         for (final BrokerService broker : brokers.values()) {
             if (!broker.getNetworkConnectors().isEmpty()) {
-            	// Max wait here is 30 secs
+                // Max wait here is 30 secs
                 Wait.waitFor(new Wait.Condition() {
+                    @Override
                     public boolean isSatisified() throws Exception {
                         return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
                     }});
             }
         }
     }
-    
+
+    @Override
     protected void tearDown() throws Exception {
         bridge.stop();
         localBroker.stop();
@@ -156,11 +155,11 @@ public class BrokerNetworkWithStuckMessa
     }
 
     public void testBrokerNetworkWithStuckMessages() throws Exception {
-        
+
         int sendNumMessages = 10;
         int receiveNumMessages = 5;
-        
-        // Create a producer 
+
+        // Create a producer
         StubConnection connection1 = createConnection();
         ConnectionInfo connectionInfo1 = createConnectionInfo();
         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
@@ -168,41 +167,41 @@ public class BrokerNetworkWithStuckMessa
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
         connection1.send(producerInfo);
-        
-        // Create a destination on the local broker 
+
+        // Create a destination on the local broker
         ActiveMQDestination destinationInfo1 = null;
-        
-        // Send a 10 messages to the local broker 
+
+        // Send a 10 messages to the local broker
         for (int i = 0; i < sendNumMessages; ++i) {
             destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
             connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
         }
-        
-        // Ensure that there are 10 messages on the local broker 
+
+        // Ensure that there are 10 messages on the local broker
         Object[] messages = browseQueueWithJmx(localBroker);
         assertEquals(sendNumMessages, messages.length);
-        
-        
-        // Create a synchronous consumer on the remote broker 
+
+
+        // Create a synchronous consumer on the remote broker
         StubConnection connection2 = createRemoteConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
-        ActiveMQDestination destinationInfo2 = 
+        ActiveMQDestination destinationInfo2 =
             createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
         final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
         connection2.send(consumerInfo2);
-        
-        // Consume 5 of the messages from the remote broker and ack them. 
+
+        // Consume 5 of the messages from the remote broker and ack them.
         for (int i = 0; i < receiveNumMessages; ++i) {
             Message message1 = receiveMessage(connection2, 20000);
             assertNotNull(message1);
             connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
         }
-        
-        // Ensure that there are zero messages on the local broker. This tells 
-        // us that those messages have been prefetched to the remote broker 
+
+        // Ensure that there are zero messages on the local broker. This tells
+        // us that those messages have been prefetched to the remote broker
         // where the demand exists.
         Wait.waitFor(new Wait.Condition() {
             @Override
@@ -215,7 +214,7 @@ public class BrokerNetworkWithStuckMessa
         assertEquals(0, messages.length);
 
         LOG.info("Closing consumer on remote");
-        // Close the consumer on the remote broker 
+        // Close the consumer on the remote broker
         connection2.send(consumerInfo2.createRemoveCommand());
         // also close connection etc.. so messages get dropped from the local consumer  q
         connection2.send(connectionInfo2.createRemoveCommand());
@@ -259,7 +258,7 @@ public class BrokerNetworkWithStuckMessa
         assertNull("Messages have migrated back: " + message1, message1);
 
         // Consume the last 4 messages from the local broker and ack them just
-        // to clean up the queue. 
+        // to clean up the queue.
         int counter = 1;
         for (; counter < receiveNumMessages; counter++) {
             message1 = receiveMessage(connection1);
@@ -289,15 +288,15 @@ public class BrokerNetworkWithStuckMessa
         messages = browseQueueWithJmx(localBroker);
         assertEquals(0, messages.length);
 
-        // Close the consumer on the remote broker 
+        // Close the consumer on the remote broker
         connection2.send(consumerInfo3.createRemoveCommand());
-        
+
         connection1.stop();
         connection2.stop();
     }
-    
+
     protected BrokerService createBroker() throws Exception {
-        localBroker = new BrokerService(); 
+        localBroker = new BrokerService();
         localBroker.setBrokerName("localhost");
         localBroker.setUseJmx(true);
         localBroker.setPersistenceAdapter(null);
@@ -307,11 +306,11 @@ public class BrokerNetworkWithStuckMessa
         configureBroker(localBroker);
         localBroker.start();
         localBroker.waitUntilStarted();
-        
+
         localBroker.getManagementContext().setConnectorPort(2221);
-        
+
         brokers.put(localBroker.getBrokerName(), localBroker);
-        
+
         return localBroker;
     }
 
@@ -337,32 +336,32 @@ public class BrokerNetworkWithStuckMessa
         configureBroker(remoteBroker);
         remoteBroker.start();
         remoteBroker.waitUntilStarted();
-        
+
         remoteBroker.getManagementContext().setConnectorPort(2222);
-        
+
         brokers.put(remoteBroker.getBrokerName(), remoteBroker);
-        
+
         return remoteBroker;
     }
-    
+
     protected Transport createTransport() throws Exception {
         Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
         return transport;
     }
-    
+
     protected Transport createRemoteTransport() throws Exception {
         Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
         return transport;
     }
-    
+
     protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
         return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
     }
-    
+
     protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
         return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
     }
-    
+
     protected String getRemoteURI() {
         return "vm://remotehost";
     }
@@ -370,7 +369,7 @@ public class BrokerNetworkWithStuckMessa
     protected String getLocalURI() {
         return "vm://localhost";
     }
-    
+
     protected StubConnection createConnection() throws Exception {
         Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
         StubConnection connection = new StubConnection(transport);
@@ -384,74 +383,75 @@ public class BrokerNetworkWithStuckMessa
         connections.add(connection);
         return connection;
     }
-    
-    @SuppressWarnings("unchecked")
+
+    @SuppressWarnings({ "unchecked", "unused" })
     private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
-		Object[] messages = null;
-		Connection connection = null;
-		Session session = null;
-
-		try {
-			URI brokerUri = connector.getUri();
-			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
-			connection = connectionFactory.createConnection();
-			connection.start();
-			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			Queue destination = session.createQueue(queueName);
-			QueueBrowser browser = session.createBrowser(destination);
-			List<Message> list = new ArrayList<Message>();
-			for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
-				list.add(enumn.nextElement());
-			}
-			messages = list.toArray();
-		}
-		finally {
-			if (session != null) {
-				session.close();
-			}
-			if (connection != null) {
-				connection.close();
-			}
-		}
-		LOG.info("+Browsed with JMS: " + messages.length);
-		
-		return messages;
-	}
-    
+        Object[] messages = null;
+        Connection connection = null;
+        Session session = null;
+
+        try {
+            URI brokerUri = connector.getUri();
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(queueName);
+            QueueBrowser browser = session.createBrowser(destination);
+            List<Message> list = new ArrayList<Message>();
+            for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
+                list.add(enumn.nextElement());
+            }
+            messages = list.toArray();
+        }
+        finally {
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+        LOG.info("+Browsed with JMS: " + messages.length);
+
+        return messages;
+    }
+
     private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
         Hashtable<String, String> params = new Hashtable<String, String>();
-        params.put("BrokerName", broker.getBrokerName());
-        params.put("Type", "Queue");
-        params.put("Destination", queueName);
+        params.put("brokerName", broker.getBrokerName());
+        params.put("type", "Broker");
+        params.put("destinationType", "Queue");
+        params.put("destinationName", queueName);
         ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
-        
+
         ManagementContext mgmtCtx = broker.getManagementContext();
         QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
-        
-        Object[] messages = (Object[]) queueView.browse();
-        
-		LOG.info("+Browsed with JMX: " + messages.length);
-        
+
+        Object[] messages = queueView.browse();
+
+        LOG.info("+Browsed with JMX: " + messages.length);
+
         return messages;
     }
-    
+
     protected ConnectionInfo createConnectionInfo() throws Exception {
         ConnectionInfo info = new ConnectionInfo();
         info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
         info.setClientId(info.getConnectionId().getValue());
         return info;
     }
-    
+
     protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
         SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
         return info;
     }
-    
+
     protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
         ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
         return info;
     }
-    
+
     protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
         ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
         info.setBrowser(false);
@@ -460,7 +460,7 @@ public class BrokerNetworkWithStuckMessa
         info.setDispatchAsync(false);
         return info;
     }
-    
+
     protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
         DestinationInfo info = new DestinationInfo();
         info.setConnectionId(connectionInfo.getConnectionId());
@@ -468,7 +468,7 @@ public class BrokerNetworkWithStuckMessa
         info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType));
         return info;
     }
-    
+
     protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
         if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
             DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
@@ -478,13 +478,13 @@ public class BrokerNetworkWithStuckMessa
             return ActiveMQDestination.createDestination(queueName, destinationType);
         }
     }
-    
+
     protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
         Message message = createMessage(producerInfo, destination);
         message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
         return message;
     }
-    
+
     protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
@@ -506,7 +506,7 @@ public class BrokerNetworkWithStuckMessa
         ack.setMessageCount(count);
         return ack;
     }
-    
+
     public Message receiveMessage(StubConnection connection) throws InterruptedException {
         return receiveMessage(connection, maxWait);
     }
@@ -530,5 +530,4 @@ public class BrokerNetworkWithStuckMessa
             }
         }
     }
-    
 }