You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by bs...@apache.org on 2009/11/16 17:37:10 UTC
svn commit: r880838 - in
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq:
broker/BrokerTestSupport.java network/BrokerNetworkWithStuckMessagesTest.java
Author: bsnyder
Date: Mon Nov 16 16:37:10 2009
New Revision: 880838
URL: http://svn.apache.org/viewvc?rev=880838&view=rev
Log:
First draft of a test for AMQ-2324 and AMQ-2484
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java?rev=880838&r1=880837&r2=880838&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java Mon Nov 16 16:37:10 2009
@@ -69,6 +69,8 @@
protected int txGenerator;
protected int tempDestGenerator;
protected PersistenceAdapter persistenceAdapter;
+
+ protected String queueName = "TEST";
protected int maxWait = 4000;
@@ -284,7 +286,7 @@
connection.send(info);
return info.getDestination();
} else {
- return ActiveMQDestination.createDestination("TEST", destinationType);
+ return ActiveMQDestination.createDestination(queueName, destinationType);
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=880838&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java Mon Nov 16 16:37:10 2009
@@ -0,0 +1,143 @@
+package org.apache.activemq.network;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class);
+
+ private DemandForwardingBridge bridge;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ // Create a network bridge between the local and remote brokers so that
+ // demand-based forwarding can take place
+ NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+ config.setBrokerName("local");
+ config.setDispatchAsync(false);
+ bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
+ bridge.setBrokerService(broker);
+ bridge.start();
+
+ // Enable JMX support on the local and remote brokers
+ broker.setUseJmx(true);
+ remoteBroker.setUseJmx(true);
+
+ // Set the names of teh local and remote brokers
+ broker.setBrokerName("local");
+ remoteBroker.setBrokerName("remote");
+ }
+
+ protected void tearDown() throws Exception {
+ bridge.stop();
+ super.tearDown();
+ }
+
+ public void testBrokerNetworkWithStuckMessages() throws Exception {
+
+ int sendNumMessages = 10;
+ int receiveNumMessages = 5;
+
+ // Create a producer and send a batch of 10 messages to the local broker
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ // Create a destination on the local broker
+ ActiveMQDestination destinationInfo1 = null;
+
+ for (int i = 0; i < sendNumMessages; ++i) {
+ destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
+ connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
+ }
+
+ // Ensure that there are 10 messages on the local broker
+ assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) == 10);
+
+
+ // Create a consumer on the remote broker
+ StubConnection connection2 = createRemoteConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ ActiveMQDestination destinationInfo2 =
+ createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
+ connection2.send(consumerInfo2);
+
+ // Consume 5 of the messages from the remote broker and ack them.
+ // Because the prefetch size is set to 1000, this will cause the
+ // messages on the local broker to be forwarded to the remote broker.
+ for (int i = 0; i < receiveNumMessages; ++i) {
+ Message message1 = receiveMessage(connection2);
+ assertNotNull(message1);
+ connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the consumer on the remote broker
+ connection2.send(consumerInfo2.createRemoveCommand());
+
+ // Ensure that there are zero messages on the local broker. This tells
+ // us that those messages have been prefetched to the remote broker
+ // where the demand exists.
+ assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) == 0);
+
+ // There should now be 5 messages stuck on the remote broker
+ assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo1) == 5);
+
+ // Create a consumer on the local broker just to confirm that it doesn't
+ // receive any messages
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1);
+ connection1.send(consumerInfo1);
+ Message message1 = receiveMessage(connection1);
+
+ //////////////////////////////////////////////////////
+ // An assertNull() is done here because this is currently the correct
+ // behavior. This is actually the purpose of this test - to prove that
+ // messages are stuck on the remote broker. AMQ-2324 aims to fix this
+ // situation so that messages don't get stuck.
+ assertNull(message1);
+ //////////////////////////////////////////////////////
+
+ consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
+ connection2.send(consumerInfo2);
+
+ // Consume the last 5 messages from the remote broker and ack them just
+ // to clean up the queue.
+ for (int i = 0; i < receiveNumMessages; ++i) {
+ message1 = receiveMessage(connection2);
+ assertNotNull(message1);
+ connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the consumer on the remote broker
+ connection2.send(consumerInfo2.createRemoveCommand());
+
+ // Ensure that the queue on the remote broker is empty
+ assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo2) == 0);
+ }
+
+ public static Test suite() {
+ return suite(BrokerNetworkWithStuckMessagesTest.class);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
------------------------------------------------------------------------------
svn:keywords = Id Revision