You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/07 19:20:32 UTC
svn commit: r1443642 - in
/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs:
AMQ1730Test.java AMQ2754Test.java LoadBalanceTest.java
Author: tabish
Date: Thu Feb 7 18:20:32 2013
New Revision: 1443642
URL: http://svn.apache.org/r1443642
Log:
Don't use hardcoded ports.
Modified:
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java Thu Feb 7 18:20:32 2013
@@ -17,12 +17,7 @@
package org.apache.bugs;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -33,17 +28,20 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
+import junit.framework.TestCase;
-public class AMQ1730Test extends TestCase {
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
- private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class);
+public class AMQ1730Test extends TestCase {
+ private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class);
private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
-
-
BrokerService brokerService;
private static final int MESSAGE_COUNT = 250;
@@ -55,6 +53,7 @@ public class AMQ1730Test extends TestCas
@Override
protected void setUp() throws Exception {
super.setUp();
+
brokerService = new BrokerService();
brokerService.addConnector("tcp://localhost:0");
brokerService.setUseJmx(false);
@@ -65,6 +64,7 @@ public class AMQ1730Test extends TestCas
protected void tearDown() throws Exception {
super.tearDown();
brokerService.stop();
+ brokerService.waitUntilStopped();
}
public void testRedelivery() throws Exception {
@@ -109,7 +109,7 @@ public class AMQ1730Test extends TestCas
messageListenerContainer.setSessionTransacted(false);
messageListenerContainer.setMessageListener(new MessageListener() {
-
+ @Override
public void onMessage(Message message) {
if (!(message instanceof TextMessage)) {
throw new RuntimeException();
@@ -159,7 +159,5 @@ public class AMQ1730Test extends TestCas
T get() {
return value;
}
-
}
-
}
Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java Thu Feb 7 18:20:32 2013
@@ -35,11 +35,11 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
-//import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
+//import org.apache.activemq.pool.PooledConnectionFactory;
public class AMQ2754Test extends TestCase {
@@ -47,110 +47,117 @@ public class AMQ2754Test extends TestCas
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
+ String broker1Uri;
+ String broker2Uri;
+
final int total = 100;
final CountDownLatch latch = new CountDownLatch(total);
final boolean conduitSubscriptions = true;
try {
- {
- brokerService1 = new BrokerService();
- brokerService1.setBrokerName("consumer");
- brokerService1.setUseJmx(false);
- brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
- brokerService1.addConnector("tcp://0.0.0.0:61616");
- brokerService1.start();
- }
+ {
+ brokerService1 = new BrokerService();
+ brokerService1.setBrokerName("consumer");
+ brokerService1.setUseJmx(false);
+ brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker1Uri = brokerService1.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
+ brokerService1.start();
+ }
- {
- brokerService2 = new BrokerService();
- brokerService2.setBrokerName("producer");
- brokerService2.setUseJmx(false);
- brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
- brokerService2.addConnector("tcp://0.0.0.0:51515");
- NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
- network2.setName("network1");
- network2.setDynamicOnly(true);
- network2.setConduitSubscriptions(conduitSubscriptions);
- network2.setNetworkTTL(3);
- network2.setPrefetchSize(1);
- brokerService2.start();
- }
+ {
+ brokerService2 = new BrokerService();
+ brokerService2.setBrokerName("producer");
+ brokerService2.setUseJmx(false);
+ brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker2Uri = brokerService2.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
+ NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
+ network2.setName("network1");
+ network2.setDynamicOnly(true);
+ network2.setConduitSubscriptions(conduitSubscriptions);
+ network2.setNetworkTTL(3);
+ network2.setPrefetchSize(1);
+ brokerService2.start();
+ }
- ExecutorService pool = Executors.newSingleThreadExecutor();
+ ExecutorService pool = Executors.newSingleThreadExecutor();
- ActiveMQConnectionFactory connectionFactory1 =
- new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
+ ActiveMQConnectionFactory connectionFactory1 =
+ new ActiveMQConnectionFactory("failover:("+broker1Uri+")");
- connectionFactory1.setWatchTopicAdvisories(false);
- final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
- container.setConnectionFactory(connectionFactory1);
- container.setMaxConcurrentConsumers(10);
- container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
- container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
- container.setDestination(new ActiveMQQueue("testingqueue"));
- container.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- latch.countDown();
- }
- });
- container.setMaxMessagesPerTask(1);
- container.afterPropertiesSet();
- container.start();
-
- pool.submit(new Callable<Object>() {
- public Object call() throws Exception {
- try {
- final int batch = 10;
- ActiveMQConnectionFactory connectionFactory2 =
- new ActiveMQConnectionFactory("failover:(tcp://localhost:51515)");
- PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
- connectionFactory2.setWatchTopicAdvisories(false);
- JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
- ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
- for(int b = 0; b < batch; b++) {
- for(int i = 0; i < (total / batch); i++) {
- final String id = ":batch=" + b + "i=" + i;
- template.send(queue, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- TextMessage message = session.createTextMessage();
- message.setText("Hello World!" + id);
- return message;
- }
- });
- }
- // give spring time to scale back again
- while(container.getActiveConsumerCount() > 1) {
- System.out.println("active consumer count:" + container.getActiveConsumerCount());
- System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
- Thread.sleep(1000);
+ connectionFactory1.setWatchTopicAdvisories(false);
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(connectionFactory1);
+ container.setMaxConcurrentConsumers(10);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
+ container.setDestination(new ActiveMQQueue("testingqueue"));
+ container.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+ container.setMaxMessagesPerTask(1);
+ container.afterPropertiesSet();
+ container.start();
+
+ final String finalBroker2Uri = broker2Uri;
+
+ pool.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ try {
+ final int batch = 10;
+ ActiveMQConnectionFactory connectionFactory2 =
+ new ActiveMQConnectionFactory("failover:("+finalBroker2Uri+")");
+ PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
+ connectionFactory2.setWatchTopicAdvisories(false);
+ JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
+ ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
+ for(int b = 0; b < batch; b++) {
+ for(int i = 0; i < (total / batch); i++) {
+ final String id = ":batch=" + b + "i=" + i;
+ template.send(queue, new MessageCreator() {
+ @Override
+ public Message createMessage(Session session) throws JMSException {
+ TextMessage message = session.createTextMessage();
+ message.setText("Hello World!" + id);
+ return message;
+ }
+ });
+ }
+ // give spring time to scale back again
+ while(container.getActiveConsumerCount() > 1) {
+ System.out.println("active consumer count:" + container.getActiveConsumerCount());
+ System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
+ Thread.sleep(1000);
+ }
}
+ //pooledConnectionFactory.stop();
+ } catch(Throwable t) {
+ t.printStackTrace();
}
- //pooledConnectionFactory.stop();
- } catch(Throwable t) {
- t.printStackTrace();
+ return null;
}
- return null;
- }
- });
-
- pool.shutdown();
- pool.awaitTermination(10, TimeUnit.SECONDS);
+ });
- int count = 0;
+ pool.shutdown();
+ pool.awaitTermination(10, TimeUnit.SECONDS);
- // give it 20 seconds
- while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
- System.out.println("count " + latch.getCount());
- }
+ int count = 0;
+ // give it 20 seconds
+ while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
+ System.out.println("count " + latch.getCount());
+ }
- container.destroy();
+ container.destroy();
} finally {
- try { if(brokerService1 != null) {
+ try { if(brokerService1 != null) {
brokerService1.stop();
}} catch(Throwable t) { t.printStackTrace(); }
- try { if(brokerService2 != null) {
+ try { if(brokerService2 != null) {
brokerService2.stop();
}} catch(Throwable t) { t.printStackTrace(); }
}
@@ -158,7 +165,5 @@ public class AMQ2754Test extends TestCas
if(latch.getCount() > 0) {
fail("latch should have gone down to 0 but was " + latch.getCount());
}
-
}
-
-}
\ No newline at end of file
+}
\ No newline at end of file
Modified: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java?rev=1443642&r1=1443641&r2=1443642&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java (original)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java Thu Feb 7 18:20:32 2013
@@ -39,9 +39,9 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.junit.Ignore;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@@ -61,48 +61,49 @@ public class LoadBalanceTest {
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
final CountDownLatch startProducer = new CountDownLatch(1);
+
+ String broker1Uri;
+ String broker2Uri;
+
try {
- {
- brokerService1 = new BrokerService();
- brokerService1.setBrokerName("one");
- brokerService1.setUseJmx(false);
- brokerService1
- .setPersistenceAdapter(new MemoryPersistenceAdapter());
- brokerService1.addConnector("nio://0.0.0.0:61616");
- final NetworkConnector network1 = brokerService1
- .addNetworkConnector("static:(tcp://localhost:51515)");
- network1.setName("network1");
- network1.setDynamicOnly(true);
- network1.setNetworkTTL(3);
- network1.setPrefetchSize(networkBridgePrefetch);
- network1.setConduitSubscriptions(false);
- network1.setDecreaseNetworkConsumerPriority(false);
- network1.setDispatchAsync(false);
- brokerService1.start();
- }
- {
- brokerService2 = new BrokerService();
- brokerService2.setBrokerName("two");
- brokerService2.setUseJmx(false);
- brokerService2
- .setPersistenceAdapter(new MemoryPersistenceAdapter());
- brokerService2.addConnector("nio://0.0.0.0:51515");
- final NetworkConnector network2 = brokerService2
- .addNetworkConnector("static:(tcp://localhost:61616)");
- network2.setName("network1");
- network2.setDynamicOnly(true);
- network2.setNetworkTTL(3);
- network2.setPrefetchSize(networkBridgePrefetch);
- network2.setConduitSubscriptions(false);
- network2.setDecreaseNetworkConsumerPriority(false);
- network2.setDispatchAsync(false);
- brokerService2.start();
- }
+
+
+ brokerService1 = new BrokerService();
+ brokerService1.setBrokerName("one");
+ brokerService1.setUseJmx(false);
+ brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker1Uri = brokerService1.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
+
+ brokerService2 = new BrokerService();
+ brokerService2.setBrokerName("two");
+ brokerService2.setUseJmx(false);
+ brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker2Uri = brokerService2.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
+
+ final NetworkConnector network1 = brokerService1.addNetworkConnector("static:("+broker2Uri+")");
+ network1.setName("network1");
+ network1.setDynamicOnly(true);
+ network1.setNetworkTTL(3);
+ network1.setPrefetchSize(networkBridgePrefetch);
+ network1.setConduitSubscriptions(false);
+ network1.setDecreaseNetworkConsumerPriority(false);
+ network1.setDispatchAsync(false);
+
+ final NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
+ network2.setName("network1");
+ network2.setDynamicOnly(true);
+ network2.setNetworkTTL(3);
+ network2.setPrefetchSize(networkBridgePrefetch);
+ network2.setConduitSubscriptions(false);
+ network2.setDecreaseNetworkConsumerPriority(false);
+ network2.setDispatchAsync(false);
+
+ brokerService1.start();
+ brokerService2.start();
+
final ExecutorService pool = Executors.newSingleThreadExecutor();
- final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
- "vm://one");
- final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
- connectionFactory1);
+ final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("vm://one");
+ final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
@@ -110,6 +111,7 @@ public class LoadBalanceTest {
container1.setDestination(new ActiveMQQueue("testingqueue"));
container1.setMessageListener(new MessageListener() {
+ @Override
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
@@ -118,6 +120,7 @@ public class LoadBalanceTest {
container1.start();
pool.submit(new Callable<Object>() {
+ @Override
public Object call() throws Exception {
try {
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@@ -133,6 +136,7 @@ public class LoadBalanceTest {
"testingqueue"));
container2.setMessageListener(new MessageListener() {
+ @Override
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
@@ -151,6 +155,7 @@ public class LoadBalanceTest {
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
+ @Override
public Message createMessage(
final Session session)
throws JMSException {
@@ -197,6 +202,7 @@ public class LoadBalanceTest {
try {
if (brokerService1 != null) {
brokerService1.stop();
+ brokerService1.waitUntilStopped();
}
} catch (final Throwable t) {
t.printStackTrace();
@@ -204,12 +210,13 @@ public class LoadBalanceTest {
try {
if (brokerService2 != null) {
brokerService2.stop();
+ brokerService2.waitUntilStopped();
}
} catch (final Throwable t) {
t.printStackTrace();
}
}
-
+
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
@@ -240,6 +247,7 @@ public class LoadBalanceTest {
container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container1.setMessageListener(new MessageListener() {
+ @Override
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
@@ -248,6 +256,7 @@ public class LoadBalanceTest {
container1.start();
pool.submit(new Callable<Object>() {
+ @Override
public Object call() throws Exception {
System.setProperty("lbt.brokerName", "two");
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@@ -261,16 +270,17 @@ public class LoadBalanceTest {
container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container2.setMessageListener(new MessageListener() {
+ @Override
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
-
-
+
+
assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
-
+
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
@@ -279,6 +289,7 @@ public class LoadBalanceTest {
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
+ @Override
public Message createMessage(final Session session)
throws JMSException {
final TextMessage message = session
@@ -291,14 +302,14 @@ public class LoadBalanceTest {
return null;
}
});
-
+
// give network a chance to build, needs advisories
waitForBridgeFormation();
startProducer.countDown();
-
+
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
-
+
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;