You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/30 22:42:07 UTC

svn commit: r1367285 - in /activemq/activemq-apollo/trunk/apollo-itests/src/test: java/org/apache/activemq/apollo/JMSConsumerTest.java resources/apollo.xml

Author: chirino
Date: Mon Jul 30 20:42:07 2012
New Revision: 1367285

URL: http://svn.apache.org/viewvc?rev=1367285&view=rev
Log:
Fixed APLO-227: All tests are now passing and re-enabled.

Modified:
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java?rev=1367285&r1=1367284&r2=1367285&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java Mon Jul 30 20:42:07 2012
@@ -17,19 +17,11 @@
 package org.apache.activemq.apollo;
 
 import junit.framework.Test;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
-import javax.management.ObjectName;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,10 +33,6 @@ import java.util.concurrent.atomic.Atomi
 
 /**
  * Test cases used to test the JMS message consumer.
- *
- * This test case currently has lots of tests commented out.. Look for the
- * 'TODO's.  https://issues.apache.org/jira/browse/APLO-227 is tracking these
- * issues.
  */
 public class JMSConsumerTest extends JmsTestBase {
 
@@ -65,9 +53,7 @@ public class JMSConsumerTest extends Jms
     public DestinationType destinationType;
     public boolean durableConsumer;
 
-    // TODO: figure out why this test does not work /w stompjms
     public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -83,7 +69,7 @@ public class JMSConsumerTest extends Jms
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         destination = createDestination(session, destinationType);
-        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
         consumer.setMessageListener(new MessageListener() {
             public void onMessage(Message m) {
                 counter.incrementAndGet();
@@ -103,7 +89,7 @@ public class JMSConsumerTest extends Jms
         assertEquals(1, counter.get());
 
         // Stop the consumer.
-        consumer.stop();
+        connection.stop();
 
         // Send a message, but should not get delivered.
         sendMessages(session, destination, 1);
@@ -111,13 +97,15 @@ public class JMSConsumerTest extends Jms
         assertEquals(1, counter.get());
 
         // Start the consumer, and the message should now get delivered.
-        consumer.start();
+        connection.start();
         assertTrue(done2.await(1, TimeUnit.SECONDS));
         assertEquals(2, counter.get());
     }
 
-    // TODO: figure out why this test causes a OOM /w the stompjms client.
     public void initCombosForTestMessageListenerWithConsumerCanBeStoppedConcurently() {
+        // TODO: This test case fails for STOMP because it will client ack a message
+        // after it's associated consumer has been closed.  Need to the check the JMS
+        // spec to see if this is valid.
         setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
     }
     public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
@@ -133,11 +121,11 @@ public class JMSConsumerTest extends Jms
         sendMessages(session, destination, 2000);
 
 
-        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+        final MessageConsumer consumer = session.createConsumer(destination);
 
         final Map<Thread, Throwable> exceptions =
             Collections.synchronizedMap(new HashMap<Thread, Throwable>());
-        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             public void uncaughtException(Thread t, Throwable e) {
                 LOG.error("Uncaught exception:", e);
                 exceptions.put(t, e);
@@ -187,21 +175,17 @@ public class JMSConsumerTest extends Jms
 
 
     public void initCombosForTestMutiReceiveWithPrefetch1() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
                                                       Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
     }
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    // TODO: test is failing.
-    //
-    public void ignoreMutiReceiveWithPrefetch1() throws Exception {
+
+    public void testMutiReceiveWithPrefetch1() throws Exception {
 
         // Set prefetch to 1
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Use all the ack modes
@@ -222,11 +206,7 @@ public class JMSConsumerTest extends Jms
         message.acknowledge();
     }
 
-    //
-    // TODO: find out why this test is failing on a stompjms connection.
-    //
     public void initCombosForTestDurableConsumerSelectorChange() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.TOPIC_TYPE});
     }
@@ -270,11 +250,7 @@ public class JMSConsumerTest extends Jms
         assertNull(consumer.receiveNoWait());
     }
 
-    //
-    // TODO: find out why this test is failing on a stompjms connection.
-    //
     public void initCombosForTestSendReceiveBytesMessage() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -303,11 +279,7 @@ public class JMSConsumerTest extends Jms
         assertNull(consumer.receiveNoWait());
     }
 
-    //
-    // TODO: find out why this test is failing on a stompjms connection.
-    //
     public void initCombosForTestSetMessageListenerAfterStart() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -344,10 +316,8 @@ public class JMSConsumerTest extends Jms
         assertEquals(4, counter.get());
     }
 
-    //
-    // TODO: find out why this test is failing on a stompjms connection.
-    //
     public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+        // This method tests an Openwire JMS specific extension (not applicable to stompjms).
         setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
     }
@@ -381,29 +351,19 @@ public class JMSConsumerTest extends Jms
     }
 
     public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
     }
 
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    // TODO: find out why this test is failing
-    //
-    public void ignoreMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
 
         final AtomicInteger counter = new AtomicInteger(0);
         final CountDownLatch sendDone = new CountDownLatch(1);
         final CountDownLatch got2Done = new CountDownLatch(1);
 
         // Set prefetch to 1
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
-        // This test case does not work if optimized message dispatch is used as
-        // the main thread send block until the consumer receives the
-        // message. This test depends on thread decoupling so that the main
-        // thread can stop the consumer thread.
-        ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Use all the ack modes
@@ -437,10 +397,10 @@ public class JMSConsumerTest extends Jms
         assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
 
         // Re-start connection.
-        connection = (ActiveMQConnection)factory.createConnection();
+        connection = factory.createConnection();
         connections.add(connection);
 
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Pickup the remaining messages.
@@ -455,7 +415,7 @@ public class JMSConsumerTest extends Jms
                     // order is not guaranteed as the connection is started before the listener is set.
                     // assertEquals("" + counter.get(), tm.getText());
                     counter.incrementAndGet();
-                    if (counter.get() == 4) {
+                    if (counter.get() == 5) {
                         done2.countDown();
                     }
                 } catch (Throwable e) {
@@ -472,29 +432,19 @@ public class JMSConsumerTest extends Jms
     }
 
     public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
     }
 
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    // TODO: find out why this test is failing
-    //
-    public void ignoreMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
 
         final AtomicInteger counter = new AtomicInteger(0);
         final CountDownLatch sendDone = new CountDownLatch(1);
         final CountDownLatch got2Done = new CountDownLatch(1);
 
         // Set prefetch to 1
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
-        // This test case does not work if optimized message dispatch is used as
-        // the main thread send block until the consumer receives the
-        // message. This test depends on thread decoupling so that the main
-        // thread can stop the consumer thread.
-        ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Use all the ack modes
@@ -528,10 +478,10 @@ public class JMSConsumerTest extends Jms
         assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
 
         // Re-start connection.
-        connection = (ActiveMQConnection)factory.createConnection();
+        connection = factory.createConnection();
         connections.add(connection);
 
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Pickup the remaining messages.
@@ -562,22 +512,18 @@ public class JMSConsumerTest extends Jms
     }
 
     public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
     }
 
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    //
     public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
 
         final AtomicInteger counter = new AtomicInteger(0);
         final CountDownLatch done = new CountDownLatch(1);
 
         // Receive a message with the JMS API
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -602,9 +548,7 @@ public class JMSConsumerTest extends Jms
         assertEquals(4, counter.get());
     }
 
-    // TODO: figure out why this is failing with the stompjms connections
     public void initCombosForTestMessageListenerWithConsumer() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -640,20 +584,16 @@ public class JMSConsumerTest extends Jms
     }
 
     public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
                                                       Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
     }
 
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    //
     public void testUnackedWithPrefetch1StayInQueue() throws Exception {
 
         // Set prefetch to 1
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Use all the ack modes
@@ -673,9 +613,9 @@ public class JMSConsumerTest extends Jms
         message.acknowledge();
 
         connection.close();
-        connection = (ActiveMQConnection)factory.createConnection();
+        connection = factory.createConnection();
         connections.add(connection);
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         // Use all the ack modes
@@ -693,22 +633,17 @@ public class JMSConsumerTest extends Jms
     }
 
     public void initCombosForTestPrefetch1MessageNotDispatched() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
     }
 
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    // TODO: find out why this test is failing
-    //
-    public void ignorePrefetch1MessageNotDispatched() throws Exception {
+    public void testPrefetch1MessageNotDispatched() throws Exception {
 
         // Set prefetch to 1
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        protocol.setPrefetch(connection, 1);
         connection.start();
 
         Session session = connection.createSession(true, 0);
-        destination = new ActiveMQQueue("TEST");
+        destination = createDestination(session, DestinationType.QUEUE_TYPE);
         MessageConsumer consumer = session.createConsumer(destination);
 
         // Send 2 messages to the destination.
@@ -719,7 +654,7 @@ public class JMSConsumerTest extends Jms
         // Since prefetch is still full, the 2nd message should get dispatched
         // to another consumer.. lets create the 2nd consumer test that it does
         // make sure it does.
-        ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+        Connection connection2 = factory.createConnection();
         connection2.start();
         connections.add(connection2);
         Session session2 = connection2.createSession(true, 0);
@@ -780,9 +715,7 @@ public class JMSConsumerTest extends Jms
         assertNull(consumer.receiveNoWait());
     }
 
-    // TODO: figure out why this is failing with the stompjms connections
     public void initCombosForTestReceiveMessageWithConsumer() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
                                                               DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
@@ -824,52 +757,50 @@ public class JMSConsumerTest extends Jms
             assertNotNull(m);
         }
         assertNull(consumer.receive(1000));
-        
+
         // Close out the consumer.. no other messages should be left on the queue.
         consumer.close();
-        
+
         consumer = session.createConsumer(destination);
         assertNull(consumer.receive(1000));
     }
 
-    // TODO: figure out why this is failing
-    public void ignoreRedispatchOfUncommittedTx() throws Exception {
+    public void testRedispatchOfUncommittedTx() throws Exception {
 
         connection.start();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         destination = createDestination(session, DestinationType.QUEUE_TYPE);
-        
+
         sendMessages(connection, destination, 2);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
         assertNotNull(consumer.receive(1000));
         assertNotNull(consumer.receive(1000));
-        
+
         // install another consumer while message dispatch is unacked/uncommitted
         Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
 
         // no commit so will auto rollback and get re-dispatched to redisptachConsumer
         session.close();
-                
+
         Message msg = redispatchConsumer.receive(1000);
         assertNotNull(msg);
         assertTrue("redelivered flag set", msg.getJMSRedelivered());
         assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
-        
+
         msg = redispatchConsumer.receive(1000);
         assertNotNull(msg);
         assertTrue(msg.getJMSRedelivered());
         assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
         redispatchSession.commit();
-        
+
         assertNull(redispatchConsumer.receive(500));
         redispatchSession.close();
     }
 
     
-    // TODO: figure out why this is failing
-    public void ignoreRedispatchOfRolledbackTx() throws Exception {
+    public void testRedispatchOfRolledbackTx() throws Exception {
 
         connection.start();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -904,31 +835,19 @@ public class JMSConsumerTest extends Jms
     
     
     public void initCombosForTestAckOfExpired() {
-        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
         addCombinationValues("destinationType",
                 new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
     }
         
-    //
-    // TODO: support setting the prefetch policy on a stompjms connection.
-    //
-    // TODO: figure out why this is failing
-    public void ignoreAckOfExpired() throws Exception {
-        
-//        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
-//        connection = fact.createActiveMQConnection();
-        
-        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(4);
-        ((ActiveMQConnection)connection).setSendAcksAsync(false);
+    public void testAckOfExpired() throws Exception {
+
+        protocol.setPrefetch(connection, 4);
 
         connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         destination = createDestination(session, destinationType);
-                    
-        MessageConsumer consumer = session.createConsumer(destination);
-        // connection.setStatsEnabled(true);
-                
-        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+
+        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageProducer producer = sendSession.createProducer(destination);
         producer.setTimeToLive(1000);
         final int count = 4;
@@ -936,32 +855,22 @@ public class JMSConsumerTest extends Jms
             TextMessage message = sendSession.createTextMessage("" + i);
             producer.send(message);
         }
-        
+
         // let first bunch in queue expire
         Thread.sleep(2000);
-        
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
         producer.setTimeToLive(0);
         for (int i = 0; i < count; i++) {
             TextMessage message = sendSession.createTextMessage("no expiry" + i);
             producer.send(message);
         }
-        
-        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
-        
+
         for(int i=0; i<count; i++) {
-            TextMessage msg = (TextMessage) amqConsumer.receive();
+            TextMessage msg = (TextMessage) consumer.receive();
             assertNotNull(msg);
             assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
-            
-            // force an ack when there are expired messages
-            amqConsumer.acknowledge();         
         }
-        assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
-
-        long t = protocol.getInflightCount(broker, destination);
-        assertEquals("Wrong inFlightCount: " + t, 0, t);
-        t = protocol.getDequeueCount(broker, destination);
-        assertEquals("Wrong dequeue count: " + t, 8, t);
-        //assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml?rev=1367285&r1=1367284&r2=1367285&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml Mon Jul 30 20:42:07 2012
@@ -31,6 +31,8 @@
   </virtual_host>
 
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
-  <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+  <connector id="tcp" bind="tcp://0.0.0.0:0">
+    <stomp add_redeliveries_header="JMSXDeliveryCount"/>
+  </connector>
 
 </broker>
\ No newline at end of file