You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/05/23 08:11:19 UTC
svn commit: r777803 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
Author: rajdavies
Date: Sat May 23 06:11:19 2009
New Revision: 777803
URL: http://svn.apache.org/viewvc?rev=777803&view=rev
Log:
Added test case for https://issues.apache.org/activemq/browse/AMQ-1936
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java (with props)
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java?rev=777803&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java Sat May 23 06:11:19 2009
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.log4j.Logger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import junit.framework.TestCase;
+/**
+ * A AMQ1936Test
+ *
+ */
+public class AMQ1936Test extends TestCase{
+ private final static Logger logger = Logger.getLogger( AMQ1936Test.class );
+ private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
+ ////--
+ //
+ private final static long TEST_MESSAGE_COUNT = 60000; // The number of test messages to use
+ //
+ ////--
+ private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
+ private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be processed within a JMS transaction
+
+ private ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CONSUMER_COUNT,CONSUMER_COUNT, Long.MAX_VALUE,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+ private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[ CONSUMER_COUNT ];
+ private BrokerService broker = null;
+ static QueueConnectionFactory connectionFactory = null;
+
+
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ broker = new BrokerService();
+ broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+ broker.setBrokerName("test");
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.start();
+ connectionFactory = new ActiveMQConnectionFactory("vm://test");;
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+
+ if( threadPool!=null ) {
+ // signal receivers to stop
+ for( ThreadedMessageReceiver receiver: receivers) {
+ receiver.setShouldStop( true );
+ }
+
+ logger.info("Waiting for receivers to shutdown..");
+ if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
+ logger.warn("Not all receivers completed shutdown.");
+ } else {
+ logger.info("All receivers shutdown successfully..");
+ }
+ }
+
+ logger.debug("Stoping the broker.");
+
+ if( broker!=null ) {
+ broker.stop();
+ }
+ }
+
+ private void sendTextMessage( String queueName, String msg ) throws JMSException, NamingException {
+ QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
+ QueueConnection queueConnection = null;
+ QueueSession session = null;
+ QueueSender sender = null;
+ Queue queue = null;
+ TextMessage message = null;
+
+ try {
+
+ // Create the queue connection
+ queueConnection = connectionFactory.createQueueConnection();
+
+ session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
+ queue = session.createQueue(TEST_QUEUE_NAME);
+ sender = session.createSender( queue );
+ sender.setDeliveryMode( DeliveryMode.PERSISTENT );
+
+ message = session.createTextMessage( msg );
+
+ // send the message
+ sender.send( message );
+
+ if( session.getTransacted()) {
+ session.commit();
+ }
+
+ logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( )
+ + " content:" + message.getText());
+ } finally {
+ if( sender!=null ) {
+ sender.close();
+ }
+ if( session!=null ) {
+ session.close();
+ }
+ if( queueConnection!=null ) {
+ queueConnection.close();
+ }
+ }
+ }
+
+
+ public void testForDuplicateMessages( ) throws Exception {
+ final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap<String, String>( );
+ final Object lock = new Object( );
+ final CountDownLatch duplicateSignal = new CountDownLatch( 1 );
+ final AtomicInteger messageCount = new AtomicInteger( 0 );
+
+ // add 1/2 the number of our total messages
+ for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
+ if( duplicateSignal.getCount()==0 ) {
+ fail( "Duplicate message id detected" );
+ }
+ sendTextMessage( TEST_QUEUE_NAME, String.valueOf(i) );
+ }
+
+ // create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
+ // in a Map and checks for a duplicate
+ for( int i = 0; i < CONSUMER_COUNT; i++ ) {
+ receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) {
+
+ public void onMessage( Message message ) throws Exception {
+ synchronized( lock ) {
+ logger.info( "Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage)message).getText() );
+
+ messageCount.incrementAndGet();
+
+ if( messages.containsKey( message.getJMSMessageID()) ) {
+ duplicateSignal.countDown( );
+ logger.fatal( "duplicate message id detected:" + message.getJMSMessageID() );
+ fail( "Duplicate message id detected:" + message.getJMSMessageID() );
+ } else {
+ messages.put( message.getJMSMessageID(), message.getJMSMessageID() );
+ }
+ }
+ }
+ });
+ threadPool.submit( receivers[i]);
+ }
+
+ // starting adding the remaining messages
+ for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
+ if( duplicateSignal.getCount()==0) {
+ fail( "Duplicate message id detected" );
+ }
+ sendTextMessage( TEST_QUEUE_NAME, String.valueOf( i ) );
+ }
+
+ // allow some time for messages to be delivered to receivers.
+ Thread.sleep( 5000 );
+
+ assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
+ assertEquals( TEST_MESSAGE_COUNT, messageCount.get() );
+ }
+
+ private final static class ThreadedMessageReceiver implements Runnable {
+
+ private String queueName = null;
+ private IMessageHandler handler = null;
+ private AtomicBoolean shouldStop = new AtomicBoolean( false );
+
+ public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) {
+
+ this.queueName = queueName;
+ this.handler = handler;
+ }
+
+ public void run( ) {
+
+ QueueConnection queueConnection = null;
+ QueueSession session = null;
+ QueueReceiver receiver = null;
+ Queue queue = null;
+ Message message = null;
+ try {
+ try {
+
+ queueConnection = connectionFactory.createQueueConnection( );
+ // create a transacted session
+ session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE );
+ queue = session.createQueue(TEST_QUEUE_NAME);
+ receiver = session.createReceiver( queue );
+
+ // start the connection
+ queueConnection.start( );
+
+ logger.info( "Receiver " + Thread.currentThread().getName() + " connected." );
+
+ // start receive loop
+ while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) {
+ try {
+ message = receiver.receive( 200 );
+ } catch( Exception e) {
+ //
+ // ignore interrupted exceptions
+ //
+ if( e instanceof InterruptedException || e.getCause() instanceof InterruptedException ) {
+ /* ignore */
+ } else {
+ throw e;
+ }
+ }
+
+ if( message!=null && this.handler!=null ) {
+ this.handler.onMessage(message);
+ }
+
+ // commit session on successful handling of message
+ if( session.getTransacted()) {
+ session.commit();
+ }
+ }
+
+ logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." );
+
+ } finally {
+ if( receiver!=null ) {
+ try {
+ receiver.close();
+ } catch (JMSException e) {
+ logger.warn(e);
+ }
+ }
+ if( session!=null ) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ logger.warn(e);
+ }
+ }
+ if( queueConnection!=null ) {
+ queueConnection.close();
+ }
+ }
+ } catch ( JMSException e ) {
+ logger.error(e);
+ e.printStackTrace();
+ } catch (NamingException e) {
+ logger.error(e);
+ } catch (Exception e) {
+ logger.error(e);
+ e.printStackTrace();
+ }
+ }
+
+ public Boolean getShouldStop() {
+ return shouldStop.get();
+ }
+
+ public void setShouldStop(Boolean shouldStop) {
+ this.shouldStop.set(shouldStop);
+ }
+ }
+
+ public interface IMessageHandler {
+ void onMessage( Message message ) throws Exception;
+ }
+
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
------------------------------------------------------------------------------
svn:mime-type = text/plain