You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Jason Shepherd (JIRA)" <ji...@apache.org> on 2013/06/05 02:08:20 UTC

[jira] [Commented] (AMQ-4533) Messages stuck in queue with redelivered=true

    [ https://issues.apache.org/jira/browse/AMQ-4533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13675455#comment-13675455 ] 

Jason Shepherd commented on AMQ-4533:
-------------------------------------

This reproducer is basically a reproduction of a production scenario, and it needs to be like this, because its the only way we've found to replication the issue. If we break down what happens in the test case, hopefully you'll understand why we need so many things going on.

Step 2 in the test case is critial for reproducing the problem. It simulates slow cusomer, with threads hung, or a slow database operation. Without this slow consumer, the test completes very fast, and we don't see the problem. 

The test simulates the production environment where the broker is under heavy load. In the test case there is head load on SOME_TEST_QUEUE, all of which uses seperate connections pools, message listeners, and latches. Whereas the crital messages for the test case are on INPUT_QUEUE, and RECEIPT_QUEUE, which use OtherMessageListener, and onOtherMessageLatch.

This test case is the only evidence of a problem which has been plauging our production environment for years.

Here is how the TC works:

1) start AMQ brokers

2) start spring consumer for consumming from INPUT_QUEUE and putting receipts to RECEIPT_QUEUE
This consumer is processing first message for ever.
	   if (isStuckOld!=isStuck) {
	        log4jLogger.info("sleep for eternity");
	        Thread.sleep(100000000);
                 }	
and next 49 messages longer /30 seconds/ than rest of 1000 msgs.

	if (nrProcessed<50) {
			Thread.sleep(30000);
	}

The reason for it is to simulate some slow & stuck consumers which we can notice on production.

3) run heavy traffic to SOME.TEST.QUEUE 
a) put 1000000 messages using 50 threads to broker 1

sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.NON_PERSISTENT,getUrl1(),1);
sendInThreads(QUEUE_NAME, 500000, 50, DeliveryMode.PERSISTENT,getUrl1(),1);

b) get 900000 messages in 200 listening threads from broker 2 using OtherMessageListener

listenInThreads(QUEUE_NAME, 900000, 200,getUrl2(), mListener, otherConnections);

However we do not wait on latches as it is not important. This is traffic to simulate heavy load only - not necessary to count them.

4) send traffic to INPUT_QUEUE in 10 threads on broker 1 which triggers our spring consumer from step 2 to start consuming and to send output messages to RECEIPT_QUEUE /one receipt message sent to RECEIPT_QUEUE for one message consumed from INPUT_QUEUE/.

        	CountDownLatch latch1=sendInThreads(queueName, msgCount, threadCount, persistance,getUrl1(),500);

The latch1 is checked later in the step 6 to ensure we had all 1000 messages sent there.

5) listen on RECEIPT_QUEUE in 10 threads on broker 2 using MainMessageListener with onMainMessageLatch set to 999 /one message should be stuck on consummer forever by TC design/

            CountDownLatch latch=listenInThreads(queueName, msgCount, threadCount,getUrl2(),msgListener,connections);

6) wait for latches - because of heavy load we had to set the timeout long enough /here 1000 seconds/ to give the system the chance to process all messages from INPUT_QUEUE /the processing might be very slow because of heavy load/
			onMainMessageLatch.await(1000, TimeUnit.SECONDS);
and also we check if all messages were put into INPUT_QUEUE for processing
			latch1.await(msgCount/10, TimeUnit.SECONDS); // .MINUTES

7) check assertions for INPUT_QUEUE and RECEIPT_QUEUE - they should be empty

		    queueName = INPUT_QUEUE;
            
            queueSize1=queueSize(brokerService1, queueName);
            queueSize2=queueSize(brokerService2, queueName);            
            
            Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2);
			
		    queueName = RECEIPT_QUEUE;
            
            queueSize1=queueSize(brokerService1, queueName);
            queueSize2=queueSize(brokerService2, queueName);            
            
            Assert.assertEquals("Still messages in "+queueName,0, queueSize1+queueSize2);			


8) close AMQ brokers

                
> Messages stuck in queue with redelivered=true
> ---------------------------------------------
>
>                 Key: AMQ-4533
>                 URL: https://issues.apache.org/jira/browse/AMQ-4533
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.7.0
>         Environment: Fuse Message Broker 5.7.0
>            Reporter: Jason Shepherd
>         Attachments: AMQ4533TestPatch.txt, AMQ4533TestPatch.txt, kahaPendingMessages.zip
>
>
> We're  getting message stuck in queues with the 
> redelivery flag set to true.
> We used the following test model: put every 1 second 50 messages sequentially, and after that, the rest of 1000 msgs quickly to INPUT_QUEUE and 
> while starting 25 listeners cosuming from INPUT_QUEUE, which takes about 30 seconds to move the message to RECEIPT_QUEUE, 10 other listeners on RECEIPT_QUEUE consume and counts them.
> We tried making one of the consumer slow by setting the 
> processing time to 100000 seconds (sleep) and putting a heavy load in 
> 500 threads every 1 ms to some other queues the same time.
> Our test case is attached, you might need to install some dependencies 
> to the local maven repository manually:
>      mvn install:install-file -DgroupId=org.apache.activemq 
> -DartifactId=activemq-core -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar 
> -Dfile=activemq-core-5.7.0.fuse-71-047.jar
>      mvn install:install-file -DgroupId=org.apache.kahadb 
> -DartifactId=kahadb -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar 
> -Dfile=kahadb-5.7.0.fuse-71-047.jar
>      mvn install:install-file 
> -DgroupId=org.apache.geronimo.management.specs 
> -DartifactId=geronimo-j2ee-management_1.1_spec -Dversion=1.0.1 
> -Dpackaging=jar -Dfile=geronimo-j2ee-management_1.1_spec-1.0.1.jar
>      mvn install:install-file -DgroupId=org.apache.activemq.pool 
> -DartifactId=activemq-pool -Dversion=5.7.0-fuse-71-047 -Dpackaging=jar 
> -Dfile=activemq-pool-5.7.0.fuse-71-047.jar
> To run the test, simply use the Maven test target:
>     mvn clean test
> If the problem occurs the you'll get a message like this in the test 
> results, (target/surefire-reports):
>     java.lang.AssertionError: Still messages in InputQueue expected:<0> 
> but was:<365>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira