You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/30 22:42:07 UTC
svn commit: r1367285 - in
/activemq/activemq-apollo/trunk/apollo-itests/src/test:
java/org/apache/activemq/apollo/JMSConsumerTest.java resources/apollo.xml
Author: chirino
Date: Mon Jul 30 20:42:07 2012
New Revision: 1367285
URL: http://svn.apache.org/viewvc?rev=1367285&view=rev
Log:
Fixed APLO-227: All tests are now passing and re-enabled.
Modified:
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java?rev=1367285&r1=1367284&r2=1367285&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java Mon Jul 30 20:42:07 2012
@@ -17,19 +17,11 @@
package org.apache.activemq.apollo;
import junit.framework.Test;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
-import javax.management.ObjectName;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -41,10 +33,6 @@ import java.util.concurrent.atomic.Atomi
/**
* Test cases used to test the JMS message consumer.
- *
- * This test case currently has lots of tests commented out.. Look for the
- * 'TODO's. https://issues.apache.org/jira/browse/APLO-227 is tracking these
- * issues.
*/
public class JMSConsumerTest extends JmsTestBase {
@@ -65,9 +53,7 @@ public class JMSConsumerTest extends Jms
public DestinationType destinationType;
public boolean durableConsumer;
- // TODO: figure out why this test does not work /w stompjms
public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -83,7 +69,7 @@ public class JMSConsumerTest extends Jms
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
- ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
counter.incrementAndGet();
@@ -103,7 +89,7 @@ public class JMSConsumerTest extends Jms
assertEquals(1, counter.get());
// Stop the consumer.
- consumer.stop();
+ connection.stop();
// Send a message, but should not get delivered.
sendMessages(session, destination, 1);
@@ -111,13 +97,15 @@ public class JMSConsumerTest extends Jms
assertEquals(1, counter.get());
// Start the consumer, and the message should now get delivered.
- consumer.start();
+ connection.start();
assertTrue(done2.await(1, TimeUnit.SECONDS));
assertEquals(2, counter.get());
}
- // TODO: figure out why this test causes a OOM /w the stompjms client.
public void initCombosForTestMessageListenerWithConsumerCanBeStoppedConcurently() {
+ // TODO: This test case fails for STOMP because it will client ack a message
+ // after it's associated consumer has been closed. Need to the check the JMS
+ // spec to see if this is valid.
setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
}
public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
@@ -133,11 +121,11 @@ public class JMSConsumerTest extends Jms
sendMessages(session, destination, 2000);
- final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+ final MessageConsumer consumer = session.createConsumer(destination);
final Map<Thread, Throwable> exceptions =
Collections.synchronizedMap(new HashMap<Thread, Throwable>());
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Uncaught exception:", e);
exceptions.put(t, e);
@@ -187,21 +175,17 @@ public class JMSConsumerTest extends Jms
public void initCombosForTestMutiReceiveWithPrefetch1() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- // TODO: test is failing.
- //
- public void ignoreMutiReceiveWithPrefetch1() throws Exception {
+
+ public void testMutiReceiveWithPrefetch1() throws Exception {
// Set prefetch to 1
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Use all the ack modes
@@ -222,11 +206,7 @@ public class JMSConsumerTest extends Jms
message.acknowledge();
}
- //
- // TODO: find out why this test is failing on a stompjms connection.
- //
public void initCombosForTestDurableConsumerSelectorChange() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.TOPIC_TYPE});
}
@@ -270,11 +250,7 @@ public class JMSConsumerTest extends Jms
assertNull(consumer.receiveNoWait());
}
- //
- // TODO: find out why this test is failing on a stompjms connection.
- //
public void initCombosForTestSendReceiveBytesMessage() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -303,11 +279,7 @@ public class JMSConsumerTest extends Jms
assertNull(consumer.receiveNoWait());
}
- //
- // TODO: find out why this test is failing on a stompjms connection.
- //
public void initCombosForTestSetMessageListenerAfterStart() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -344,10 +316,8 @@ public class JMSConsumerTest extends Jms
assertEquals(4, counter.get());
}
- //
- // TODO: find out why this test is failing on a stompjms connection.
- //
public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+ // This method tests an Openwire JMS specific extension (not applicable to stompjms).
setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
}
@@ -381,29 +351,19 @@ public class JMSConsumerTest extends Jms
}
public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- // TODO: find out why this test is failing
- //
- public void ignoreMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+ public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
// Set prefetch to 1
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
- // This test case does not work if optimized message dispatch is used as
- // the main thread send block until the consumer receives the
- // message. This test depends on thread decoupling so that the main
- // thread can stop the consumer thread.
- ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Use all the ack modes
@@ -437,10 +397,10 @@ public class JMSConsumerTest extends Jms
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
// Re-start connection.
- connection = (ActiveMQConnection)factory.createConnection();
+ connection = factory.createConnection();
connections.add(connection);
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Pickup the remaining messages.
@@ -455,7 +415,7 @@ public class JMSConsumerTest extends Jms
// order is not guaranteed as the connection is started before the listener is set.
// assertEquals("" + counter.get(), tm.getText());
counter.incrementAndGet();
- if (counter.get() == 4) {
+ if (counter.get() == 5) {
done2.countDown();
}
} catch (Throwable e) {
@@ -472,29 +432,19 @@ public class JMSConsumerTest extends Jms
}
public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- // TODO: find out why this test is failing
- //
- public void ignoreMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+ public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
// Set prefetch to 1
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
- // This test case does not work if optimized message dispatch is used as
- // the main thread send block until the consumer receives the
- // message. This test depends on thread decoupling so that the main
- // thread can stop the consumer thread.
- ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Use all the ack modes
@@ -528,10 +478,10 @@ public class JMSConsumerTest extends Jms
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
// Re-start connection.
- connection = (ActiveMQConnection)factory.createConnection();
+ connection = factory.createConnection();
connections.add(connection);
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Pickup the remaining messages.
@@ -562,22 +512,18 @@ public class JMSConsumerTest extends Jms
}
public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- //
public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done = new CountDownLatch(1);
// Receive a message with the JMS API
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -602,9 +548,7 @@ public class JMSConsumerTest extends Jms
assertEquals(4, counter.get());
}
- // TODO: figure out why this is failing with the stompjms connections
public void initCombosForTestMessageListenerWithConsumer() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -640,20 +584,16 @@ public class JMSConsumerTest extends Jms
}
public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- //
public void testUnackedWithPrefetch1StayInQueue() throws Exception {
// Set prefetch to 1
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Use all the ack modes
@@ -673,9 +613,9 @@ public class JMSConsumerTest extends Jms
message.acknowledge();
connection.close();
- connection = (ActiveMQConnection)factory.createConnection();
+ connection = factory.createConnection();
connections.add(connection);
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
// Use all the ack modes
@@ -693,22 +633,17 @@ public class JMSConsumerTest extends Jms
}
public void initCombosForTestPrefetch1MessageNotDispatched() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- // TODO: find out why this test is failing
- //
- public void ignorePrefetch1MessageNotDispatched() throws Exception {
+ public void testPrefetch1MessageNotDispatched() throws Exception {
// Set prefetch to 1
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ protocol.setPrefetch(connection, 1);
connection.start();
Session session = connection.createSession(true, 0);
- destination = new ActiveMQQueue("TEST");
+ destination = createDestination(session, DestinationType.QUEUE_TYPE);
MessageConsumer consumer = session.createConsumer(destination);
// Send 2 messages to the destination.
@@ -719,7 +654,7 @@ public class JMSConsumerTest extends Jms
// Since prefetch is still full, the 2nd message should get dispatched
// to another consumer.. lets create the 2nd consumer test that it does
// make sure it does.
- ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+ Connection connection2 = factory.createConnection();
connection2.start();
connections.add(connection2);
Session session2 = connection2.createSession(true, 0);
@@ -780,9 +715,7 @@ public class JMSConsumerTest extends Jms
assertNull(consumer.receiveNoWait());
}
- // TODO: figure out why this is failing with the stompjms connections
public void initCombosForTestReceiveMessageWithConsumer() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -824,52 +757,50 @@ public class JMSConsumerTest extends Jms
assertNotNull(m);
}
assertNull(consumer.receive(1000));
-
+
// Close out the consumer.. no other messages should be left on the queue.
consumer.close();
-
+
consumer = session.createConsumer(destination);
assertNull(consumer.receive(1000));
}
- // TODO: figure out why this is failing
- public void ignoreRedispatchOfUncommittedTx() throws Exception {
+ public void testRedispatchOfUncommittedTx() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
destination = createDestination(session, DestinationType.QUEUE_TYPE);
-
+
sendMessages(connection, destination, 2);
-
+
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(1000));
assertNotNull(consumer.receive(1000));
-
+
// install another consumer while message dispatch is unacked/uncommitted
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
// no commit so will auto rollback and get re-dispatched to redisptachConsumer
session.close();
-
+
Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue("redelivered flag set", msg.getJMSRedelivered());
assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
-
+
msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue(msg.getJMSRedelivered());
assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit();
-
+
assertNull(redispatchConsumer.receive(500));
redispatchSession.close();
}
- // TODO: figure out why this is failing
- public void ignoreRedispatchOfRolledbackTx() throws Exception {
+ public void testRedispatchOfRolledbackTx() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -904,31 +835,19 @@ public class JMSConsumerTest extends Jms
public void initCombosForTestAckOfExpired() {
- setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
addCombinationValues("destinationType",
new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
}
- //
- // TODO: support setting the prefetch policy on a stompjms connection.
- //
- // TODO: figure out why this is failing
- public void ignoreAckOfExpired() throws Exception {
-
-// ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
-// connection = fact.createActiveMQConnection();
-
- ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(4);
- ((ActiveMQConnection)connection).setSendAcksAsync(false);
+ public void testAckOfExpired() throws Exception {
+
+ protocol.setPrefetch(connection, 4);
connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
-
- MessageConsumer consumer = session.createConsumer(destination);
- // connection.setStatsEnabled(true);
-
- Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(destination);
producer.setTimeToLive(1000);
final int count = 4;
@@ -936,32 +855,22 @@ public class JMSConsumerTest extends Jms
TextMessage message = sendSession.createTextMessage("" + i);
producer.send(message);
}
-
+
// let first bunch in queue expire
Thread.sleep(2000);
-
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
producer.setTimeToLive(0);
for (int i = 0; i < count; i++) {
TextMessage message = sendSession.createTextMessage("no expiry" + i);
producer.send(message);
}
-
- ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
-
+
for(int i=0; i<count; i++) {
- TextMessage msg = (TextMessage) amqConsumer.receive();
+ TextMessage msg = (TextMessage) consumer.receive();
assertNotNull(msg);
assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
-
- // force an ack when there are expired messages
- amqConsumer.acknowledge();
}
- assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
-
- long t = protocol.getInflightCount(broker, destination);
- assertEquals("Wrong inFlightCount: " + t, 0, t);
- t = protocol.getDequeueCount(broker, destination);
- assertEquals("Wrong dequeue count: " + t, 8, t);
- //assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
}
}
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml?rev=1367285&r1=1367284&r2=1367285&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml Mon Jul 30 20:42:07 2012
@@ -31,6 +31,8 @@
</virtual_host>
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
- <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+ <connector id="tcp" bind="tcp://0.0.0.0:0">
+ <stomp add_redeliveries_header="JMSXDeliveryCount"/>
+ </connector>
</broker>
\ No newline at end of file