You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/09 23:36:17 UTC
svn commit: r1431123 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Author: tabish
Date: Wed Jan 9 22:36:17 2013
New Revision: 1431123
URL: http://svn.apache.org/viewvc?rev=1431123&view=rev
Log:
Fix another test that fails because of https://issues.apache.org/jira/browse/AMQ-4237
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=1431123&r1=1431122&r2=1431123&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Wed Jan 9 22:36:17 2013
@@ -39,7 +39,6 @@ import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerTestSupport;
import org.apache.activemq.broker.StubConnection;
@@ -68,87 +67,87 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class duplicates most of the functionality in {@link NetworkTestSupport}
- * and {@link BrokerTestSupport} because more control was needed over how brokers
- * and connectors are created. Also, this test asserts message counts via JMX on
- * each broker.
- *
- * @author bsnyder
- *
+ * This class duplicates most of the functionality in {@link NetworkTestSupport}
+ * and {@link BrokerTestSupport} because more control was needed over how brokers
+ * and connectors are created. Also, this test asserts message counts via JMX on
+ * each broker.
*/
public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSupport*/ {
-
+
private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
-
- private BrokerService localBroker;
- private BrokerService remoteBroker;
+
+ private BrokerService localBroker;
+ private BrokerService remoteBroker;
private DemandForwardingBridge bridge;
-
+
protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
- protected ArrayList connections = new ArrayList();
-
+ protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+
protected TransportConnector connector;
protected TransportConnector remoteConnector;
-
+
protected long idGenerator;
protected int msgIdGenerator;
protected int tempDestGenerator;
protected int maxWait = 4000;
protected String queueName = "TEST";
-
+
protected String amqDomain = "org.apache.activemq";
-
+
+ @Override
protected void setUp() throws Exception {
-
- // For those who want visual confirmation:
- // Uncomment the following to enable JMX support on a port number to use
- // Jconsole to view each broker. You will need to add some calls to
- // Thread.sleep() to be able to actually slow things down so that you
- // can manually see JMX attrs.
+
+ // For those who want visual confirmation:
+ // Uncomment the following to enable JMX support on a port number to use
+ // Jconsole to view each broker. You will need to add some calls to
+ // Thread.sleep() to be able to actually slow things down so that you
+ // can manually see JMX attrs.
// System.setProperty("com.sun.management.jmxremote", "");
// System.setProperty("com.sun.management.jmxremote.port", "1099");
// System.setProperty("com.sun.management.jmxremote.authenticate", "false");
// System.setProperty("com.sun.management.jmxremote.ssl", "false");
-
- // Create the local broker
+
+ // Create the local broker
createBroker();
- // Create the remote broker
+ // Create the remote broker
createRemoteBroker();
-
+
// Remove the activemq-data directory from the creation of the remote broker
FileUtils.deleteDirectory(new File("activemq-data"));
-
- // Create a network bridge between the local and remote brokers so that
+
+ // Create a network bridge between the local and remote brokers so that
// demand-based forwarding can take place
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setBrokerName("local");
config.setDispatchAsync(false);
config.setDuplex(true);
- Transport localTransport = createTransport();
+ Transport localTransport = createTransport();
Transport remoteTransport = createRemoteTransport();
-
- // Create a network bridge between the two brokers
+
+ // Create a network bridge between the two brokers
bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
bridge.setBrokerService(localBroker);
bridge.start();
-
+
waitForBridgeFormation();
-
+
}
-
+
protected void waitForBridgeFormation() throws Exception {
for (final BrokerService broker : brokers.values()) {
if (!broker.getNetworkConnectors().isEmpty()) {
- // Max wait here is 30 secs
+ // Max wait here is 30 secs
Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
}});
}
}
}
-
+
+ @Override
protected void tearDown() throws Exception {
bridge.stop();
localBroker.stop();
@@ -156,11 +155,11 @@ public class BrokerNetworkWithStuckMessa
}
public void testBrokerNetworkWithStuckMessages() throws Exception {
-
+
int sendNumMessages = 10;
int receiveNumMessages = 5;
-
- // Create a producer
+
+ // Create a producer
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
@@ -168,41 +167,41 @@ public class BrokerNetworkWithStuckMessa
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
-
- // Create a destination on the local broker
+
+ // Create a destination on the local broker
ActiveMQDestination destinationInfo1 = null;
-
- // Send a 10 messages to the local broker
+
+ // Send a 10 messages to the local broker
for (int i = 0; i < sendNumMessages; ++i) {
destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
}
-
- // Ensure that there are 10 messages on the local broker
+
+ // Ensure that there are 10 messages on the local broker
Object[] messages = browseQueueWithJmx(localBroker);
assertEquals(sendNumMessages, messages.length);
-
-
- // Create a synchronous consumer on the remote broker
+
+
+ // Create a synchronous consumer on the remote broker
StubConnection connection2 = createRemoteConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
- ActiveMQDestination destinationInfo2 =
+ ActiveMQDestination destinationInfo2 =
createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
connection2.send(consumerInfo2);
-
- // Consume 5 of the messages from the remote broker and ack them.
+
+ // Consume 5 of the messages from the remote broker and ack them.
for (int i = 0; i < receiveNumMessages; ++i) {
Message message1 = receiveMessage(connection2, 20000);
assertNotNull(message1);
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
}
-
- // Ensure that there are zero messages on the local broker. This tells
- // us that those messages have been prefetched to the remote broker
+
+ // Ensure that there are zero messages on the local broker. This tells
+ // us that those messages have been prefetched to the remote broker
// where the demand exists.
Wait.waitFor(new Wait.Condition() {
@Override
@@ -215,7 +214,7 @@ public class BrokerNetworkWithStuckMessa
assertEquals(0, messages.length);
LOG.info("Closing consumer on remote");
- // Close the consumer on the remote broker
+ // Close the consumer on the remote broker
connection2.send(consumerInfo2.createRemoveCommand());
// also close connection etc.. so messages get dropped from the local consumer q
connection2.send(connectionInfo2.createRemoveCommand());
@@ -259,7 +258,7 @@ public class BrokerNetworkWithStuckMessa
assertNull("Messages have migrated back: " + message1, message1);
// Consume the last 4 messages from the local broker and ack them just
- // to clean up the queue.
+ // to clean up the queue.
int counter = 1;
for (; counter < receiveNumMessages; counter++) {
message1 = receiveMessage(connection1);
@@ -289,15 +288,15 @@ public class BrokerNetworkWithStuckMessa
messages = browseQueueWithJmx(localBroker);
assertEquals(0, messages.length);
- // Close the consumer on the remote broker
+ // Close the consumer on the remote broker
connection2.send(consumerInfo3.createRemoveCommand());
-
+
connection1.stop();
connection2.stop();
}
-
+
protected BrokerService createBroker() throws Exception {
- localBroker = new BrokerService();
+ localBroker = new BrokerService();
localBroker.setBrokerName("localhost");
localBroker.setUseJmx(true);
localBroker.setPersistenceAdapter(null);
@@ -307,11 +306,11 @@ public class BrokerNetworkWithStuckMessa
configureBroker(localBroker);
localBroker.start();
localBroker.waitUntilStarted();
-
+
localBroker.getManagementContext().setConnectorPort(2221);
-
+
brokers.put(localBroker.getBrokerName(), localBroker);
-
+
return localBroker;
}
@@ -337,32 +336,32 @@ public class BrokerNetworkWithStuckMessa
configureBroker(remoteBroker);
remoteBroker.start();
remoteBroker.waitUntilStarted();
-
+
remoteBroker.getManagementContext().setConnectorPort(2222);
-
+
brokers.put(remoteBroker.getBrokerName(), remoteBroker);
-
+
return remoteBroker;
}
-
+
protected Transport createTransport() throws Exception {
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
return transport;
}
-
+
protected Transport createRemoteTransport() throws Exception {
Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
return transport;
}
-
+
protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
}
-
+
protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
}
-
+
protected String getRemoteURI() {
return "vm://remotehost";
}
@@ -370,7 +369,7 @@ public class BrokerNetworkWithStuckMessa
protected String getLocalURI() {
return "vm://localhost";
}
-
+
protected StubConnection createConnection() throws Exception {
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
StubConnection connection = new StubConnection(transport);
@@ -384,74 +383,75 @@ public class BrokerNetworkWithStuckMessa
connections.add(connection);
return connection;
}
-
- @SuppressWarnings("unchecked")
+
+ @SuppressWarnings({ "unchecked", "unused" })
private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
- Object[] messages = null;
- Connection connection = null;
- Session session = null;
-
- try {
- URI brokerUri = connector.getUri();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(queueName);
- QueueBrowser browser = session.createBrowser(destination);
- List<Message> list = new ArrayList<Message>();
- for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
- list.add(enumn.nextElement());
- }
- messages = list.toArray();
- }
- finally {
- if (session != null) {
- session.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- LOG.info("+Browsed with JMS: " + messages.length);
-
- return messages;
- }
-
+ Object[] messages = null;
+ Connection connection = null;
+ Session session = null;
+
+ try {
+ URI brokerUri = connector.getUri();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(queueName);
+ QueueBrowser browser = session.createBrowser(destination);
+ List<Message> list = new ArrayList<Message>();
+ for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
+ list.add(enumn.nextElement());
+ }
+ messages = list.toArray();
+ }
+ finally {
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ LOG.info("+Browsed with JMS: " + messages.length);
+
+ return messages;
+ }
+
private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
Hashtable<String, String> params = new Hashtable<String, String>();
- params.put("BrokerName", broker.getBrokerName());
- params.put("Type", "Queue");
- params.put("Destination", queueName);
+ params.put("brokerName", broker.getBrokerName());
+ params.put("type", "Broker");
+ params.put("destinationType", "Queue");
+ params.put("destinationName", queueName);
ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
-
+
ManagementContext mgmtCtx = broker.getManagementContext();
QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
-
- Object[] messages = (Object[]) queueView.browse();
-
- LOG.info("+Browsed with JMX: " + messages.length);
-
+
+ Object[] messages = queueView.browse();
+
+ LOG.info("+Browsed with JMX: " + messages.length);
+
return messages;
}
-
+
protected ConnectionInfo createConnectionInfo() throws Exception {
ConnectionInfo info = new ConnectionInfo();
info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
info.setClientId(info.getConnectionId().getValue());
return info;
}
-
+
protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
return info;
}
-
+
protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
return info;
}
-
+
protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
info.setBrowser(false);
@@ -460,7 +460,7 @@ public class BrokerNetworkWithStuckMessa
info.setDispatchAsync(false);
return info;
}
-
+
protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionInfo.getConnectionId());
@@ -468,7 +468,7 @@ public class BrokerNetworkWithStuckMessa
info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType));
return info;
}
-
+
protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
@@ -478,13 +478,13 @@ public class BrokerNetworkWithStuckMessa
return ActiveMQDestination.createDestination(queueName, destinationType);
}
}
-
+
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
return message;
}
-
+
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
@@ -506,7 +506,7 @@ public class BrokerNetworkWithStuckMessa
ack.setMessageCount(count);
return ack;
}
-
+
public Message receiveMessage(StubConnection connection) throws InterruptedException {
return receiveMessage(connection, maxWait);
}
@@ -530,5 +530,4 @@ public class BrokerNetworkWithStuckMessa
}
}
}
-
}