You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Henrik (JIRA)" <ji...@apache.org> on 2016/09/06 07:56:20 UTC

[jira] [Updated] (AMQ-6417) Timeout when consuming from expired messages route

     [ https://issues.apache.org/jira/browse/AMQ-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Henrik updated AMQ-6417:
------------------------
    Summary: Timeout when consuming from expired messages route  (was: Timeout when consuming from expared messages route)

> Timeout when consuming from expired messages route
> --------------------------------------------------
>
>                 Key: AMQ-6417
>                 URL: https://issues.apache.org/jira/browse/AMQ-6417
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.13.1, 5.13.2, 5.13.3, 5.13.4, 5.14.0
>            Reporter: Henrik
>              Labels: exception, expired, timeout
>
> Starting from ActiveMQ 5.13.1, we get a timeout when consuming expired messages.  I'm at loss at how to resolve this, so any help is appreciated.  Here's my test:
> {code}
> package no.foo.integration.module
> import no.foo.integration.FooIntegrationApplication
> import no.foo.test_helpers.PortSelector
> import org.apache.camel.EndpointInject
> import org.apache.camel.builder.AdviceWithRouteBuilder
> import org.apache.camel.component.mock.MockEndpoint
> import org.apache.camel.management.JmxSystemPropertyKeys
> import org.apache.camel.model.BeanDefinition
> import org.apache.camel.model.ModelCamelContext
> import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner
> import org.apache.camel.test.spring.UseAdviceWith
> import org.junit.*
> import org.junit.runner.RunWith
> import org.springframework.beans.factory.annotation.Autowired
> import org.springframework.jms.core.JmsTemplate
> import org.springframework.test.context.ContextConfiguration
> import org.springframework.test.context.support.AnnotationConfigContextLoader
> import static no.foo.integration.constants.Route.INCOMING
> import static no.foo.integration.constants.Route.INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY
> import static no.foo.test_helpers.SystemPropertyUtil.resetProperty
> import static no.foo.test_helpers.SystemPropertyUtil.writeProperty
> @RunWith(CamelSpringJUnit4ClassRunner.class)
> @UseAdviceWith
> @ContextConfiguration(classes = [FooIntegrationApplication.class, MockConfig.class], loader = AnnotationConfigContextLoader.class)
> public class ExpiredMessagesRouteModuleTest {
>     private static final String CONSUMER_DESTINATION_ONE = "activemq:Consumer.monster.VirtualTopic.Foo.Integrations"
>     private static final String CONSUMER_DESTINATION_TWO = "activemq:Consumer.monstermagnet.VirtualTopic.Foo.Integrations"
>     private static int freePortBroker
>     @Autowired
>     private JmsTemplate jmsTemplate
>     @Autowired
>     private ModelCamelContext context
>     @EndpointInject(uri = "mock:expiredMessages")
>     private MockEndpoint expiredMessagesEndpointMock
>     @BeforeClass
>     public static void enableCamelJMX() {
>        System.clearProperty(JmxSystemPropertyKeys.DISABLED)
>        freePortBroker = PortSelector.random().select()
>        writeProperty("foo-broker.port", "" + freePortBroker)
>     }
>     @AfterClass
>     public static void tearDown() {
>        resetProperty("foo-broker.port")
>     }
>     @Before
>     public void replaceBeanWithMockEndpoint() throws Exception {
>         context.getRouteDefinition(INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY.name()).adviceWith(context, new AdviceWithRouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                weaveByType(BeanDefinition).replace().to(expiredMessagesEndpointMock)
>             }
>         });
>         context.start()
>     }
>     @Test(timeout = 10000L)
>     public void consume_from_expired_messages_advisory_topic() throws Exception {
>         createTwoTopicConsumers()
>         setUpMessagesToExpireFast()
>         jmsTemplate.convertAndSend(INCOMING.destination, [SEARCH_ID_KEY: "1"])
>         waitForMessageToExpire()
>         expiredMessagesEndpointMock.expectedMessageCount(2)
>         expiredMessagesEndpointMock.assertIsSatisfied()
>     }
>     private void setUpMessagesToExpireFast() {
>         jmsTemplate.setExplicitQosEnabled(true)
>         jmsTemplate.setTimeToLive(500L)
>     }
>     // Need to create topic consumer, otherwise the messages sent to the topic will be discarded before they expire.
>     private void createTwoTopicConsumers() {
>         context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_ONE)
>         context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_TWO)
>     }
>     def waitForMessageToExpire() {
>         Thread.sleep(1000)
>     }
> }
> {code}
> and here's the exception
> {code}
>  no.foo.integration.module.ExpiredMessagesRouteModuleTest > consume_from_expired_messages_advisory_topic FAILED
>      org.junit.runners.model.TestTimedOutException: test timed out after 10000 milliseconds
>          at sun.misc.Unsafe.park(Native Method)
>          at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>          at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>          at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>          at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>          at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1425)
>          at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1409)
>          at org.apache.camel.component.mock.MockEndpoint.doAssertIsSatisfied(MockEndpoint.java:405)
>          at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:386)
>          at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:374)
>          at org.apache.camel.component.mock.MockEndpoint$assertIsSatisfied$0.call(Unknown Source)
>          at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
>          at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
>          at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
>          at no.foo.integration.module.ExpiredMessagesRouteModuleTest.consume_from_expired_messages_advisory_topic(ExpiredMessagesRouteModuleTest.groovy:77)
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:498)
>          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>          at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
>          at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
>          at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
>          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>          at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)