You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/09 16:33:28 UTC

svn commit: r1155385 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java

Author: tabish
Date: Tue Aug  9 14:33:28 2011
New Revision: 1155385

URL: http://svn.apache.org/viewvc?rev=1155385&view=rev
Log:
Give this test more time to meet its criteria

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java?rev=1155385&r1=1155384&r2=1155385&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java Tue Aug  9 14:33:28 2011
@@ -62,13 +62,13 @@ public class AMQ1936Test extends TestCas
     private ThreadedMessageReceiver[] receivers     = new ThreadedMessageReceiver[ CONSUMER_COUNT ];
     private BrokerService broker                    = null;
     static QueueConnectionFactory connectionFactory = null;
-   
-   
+
+
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        
+
         broker = new BrokerService();
         broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
         broker.setBrokerName("test");
@@ -80,13 +80,13 @@ public class AMQ1936Test extends TestCas
     @Override
     protected void tearDown() throws Exception {
         super.tearDown();
-        
+
         if( threadPool!=null ) {
             // signal receivers to stop
             for( ThreadedMessageReceiver receiver: receivers) {
                 receiver.setShouldStop( true );
             }
-            
+
             logger.info("Waiting for receivers to shutdown..");
             if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
                 logger.warn("Not all receivers completed shutdown.");
@@ -94,14 +94,14 @@ public class AMQ1936Test extends TestCas
                 logger.info("All receivers shutdown successfully..");
             }
         }
-        
+
         logger.debug("Stoping the broker.");
-        
+
         if( broker!=null ) {
             broker.stop();
         }
     }
-    
+
     private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException {
         QueueConnectionFactory connectionFactory        = new ActiveMQConnectionFactory("vm://test");
         QueueConnection queueConnection                 = null;
@@ -109,22 +109,22 @@ public class AMQ1936Test extends TestCas
         QueueSender sender                              = null;
         Queue queue                                     = null;
         TextMessage message                                 = null;
-        
+
         try {
-            
+
             // Create the queue connection
             queueConnection = connectionFactory.createQueueConnection();
-    
+
             session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
             queue = session.createQueue(TEST_QUEUE_NAME);
             sender = session.createSender( queue );
             sender.setDeliveryMode( DeliveryMode.PERSISTENT );
 
             message = session.createTextMessage( String.valueOf(i) );
-            
+
             // send the message
             sender.send( message );
-    
+
             if( session.getTransacted()) {
                 session.commit();
             }
@@ -144,14 +144,14 @@ public class AMQ1936Test extends TestCas
             }
         }
     }
-     
-    
+
+
     public void testForDuplicateMessages( ) throws Exception {
         final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap<String, String>( );
         final Object lock                               = new Object( );
         final CountDownLatch duplicateSignal            = new CountDownLatch( 1 );
         final AtomicInteger messageCount                = new AtomicInteger( 0 );
-        
+
         // add 1/2 the number of our total messages
         for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
             if( duplicateSignal.getCount()==0 ) {
@@ -159,12 +159,12 @@ public class AMQ1936Test extends TestCas
             }
             sendTextMessage( TEST_QUEUE_NAME, i );
         }
-        
+
         // create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
         // in a Map and checks for a duplicate
         for( int i = 0; i < CONSUMER_COUNT; i++ ) {
             receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) {
-            
+
                 public void onMessage( Message message ) throws Exception {
                     synchronized( lock ) {
                         int current = messageCount.incrementAndGet();
@@ -183,7 +183,7 @@ public class AMQ1936Test extends TestCas
             });
             threadPool.submit( receivers[i]);
         }
-        
+
         // starting adding the remaining messages
         for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
             if( duplicateSignal.getCount()==0) {
@@ -191,38 +191,38 @@ public class AMQ1936Test extends TestCas
             }
             sendTextMessage( TEST_QUEUE_NAME, i );
         }
-        
+
         logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
-        
+
         // allow some time for messages to be delivered to receivers.
         boolean ok = Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return TEST_MESSAGE_COUNT == messages.size();
             }
-        }, 1*60*1000);
+        }, TimeUnit.MINUTES.toMillis(7));
         if (!ok) {
             AutoFailTestSupport.dumpAllThreads("--STUCK?--");
         }
         assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
         assertEquals( TEST_MESSAGE_COUNT,  messageCount.get() );
     }
-    
-    
-    
+
+
+
     private final static class ThreadedMessageReceiver implements Runnable {
-       
+
         private String queueName            = null;
         private IMessageHandler handler     = null;
         private AtomicBoolean shouldStop    = new AtomicBoolean( false );
-        
+
         public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) {
-         
+
             this.queueName      = queueName;
             this.handler        = handler;
         }
 
         public void run( ) {
-           
+
             QueueConnection queueConnection                 = null;
             QueueSession session                            = null;
             QueueReceiver receiver                          = null;
@@ -230,7 +230,7 @@ public class AMQ1936Test extends TestCas
             Message message                                 = null;
             try {
                 try {
-                 
+
                     queueConnection = connectionFactory.createQueueConnection( );
                     // create a transacted session
                     session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE );
@@ -239,9 +239,9 @@ public class AMQ1936Test extends TestCas
 
                     // start the connection
                     queueConnection.start( );
-                    
+
                     logger.info( "Receiver " + Thread.currentThread().getName() + " connected." );
-                    
+
                     // start receive loop
                     while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) {
                         try {
@@ -256,32 +256,32 @@ public class AMQ1936Test extends TestCas
                                 throw e;
                             }
                         }
-                        
+
                         if( message!=null && this.handler!=null ) {
                             this.handler.onMessage(message);
                         }
-                        
+
                         // commit session on successful handling of message
                         if( session.getTransacted()) {
                             session.commit();
                         }
                     }
-                    
+
                     logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." );
-                    
+
                 } finally {
                     if( receiver!=null ) {
                         try {
                             receiver.close();
-                        } catch (JMSException e)  { 
-                            logger.warn(e); 
+                        } catch (JMSException e)  {
+                            logger.warn(e);
                         }
                     }
                     if( session!=null ) {
                         try {
                             session.close();
-                        } catch (JMSException e)  { 
-                            logger.warn(e); 
+                        } catch (JMSException e)  {
+                            logger.warn(e);
                         }
                     }
                     if( queueConnection!=null ) {
@@ -307,10 +307,10 @@ public class AMQ1936Test extends TestCas
             this.shouldStop.set(shouldStop);
         }
     }
-    
+
     public interface IMessageHandler {
         void onMessage( Message message ) throws Exception;
     }
-    
+
 
 }