You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2016/09/06 08:56:20 UTC
[jira] [Commented] (AMQ-6417) Timeout when consuming from expired
messages route
[ https://issues.apache.org/jira/browse/AMQ-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466868#comment-15466868 ]
Gary Tully commented on AMQ-6417:
---------------------------------
enable trace level logging for org.apache.activemq.broker.region
either the advisory name is incorrect or it gets one message and the second is a duplicate because the expiry check is on dispatch.
there is a nice unit test at: org.apache.activemq.broker.policy.DeadLetterExpiryTest
also, by default, non persistent messages are not send to the dlq. so they won't give an advisory (i think)
org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy#setProcessNonPersistent
> Timeout when consuming from expired messages route
> --------------------------------------------------
>
> Key: AMQ-6417
> URL: https://issues.apache.org/jira/browse/AMQ-6417
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.13.1, 5.13.2, 5.13.3, 5.13.4, 5.14.0
> Reporter: Henrik
> Labels: exception, expired, timeout
>
> Starting from ActiveMQ 5.13.1, we get a timeout when consuming expired messages. I'm at loss at how to resolve this, so any help is appreciated. Here's my test:
> {code}
> package no.foo.integration.module
> import no.foo.integration.FooIntegrationApplication
> import no.foo.test_helpers.PortSelector
> import org.apache.camel.EndpointInject
> import org.apache.camel.builder.AdviceWithRouteBuilder
> import org.apache.camel.component.mock.MockEndpoint
> import org.apache.camel.management.JmxSystemPropertyKeys
> import org.apache.camel.model.BeanDefinition
> import org.apache.camel.model.ModelCamelContext
> import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner
> import org.apache.camel.test.spring.UseAdviceWith
> import org.junit.*
> import org.junit.runner.RunWith
> import org.springframework.beans.factory.annotation.Autowired
> import org.springframework.jms.core.JmsTemplate
> import org.springframework.test.context.ContextConfiguration
> import org.springframework.test.context.support.AnnotationConfigContextLoader
> import static no.foo.integration.constants.Route.INCOMING
> import static no.foo.integration.constants.Route.INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY
> import static no.foo.test_helpers.SystemPropertyUtil.resetProperty
> import static no.foo.test_helpers.SystemPropertyUtil.writeProperty
> @RunWith(CamelSpringJUnit4ClassRunner.class)
> @UseAdviceWith
> @ContextConfiguration(classes = [FooIntegrationApplication.class, MockConfig.class], loader = AnnotationConfigContextLoader.class)
> public class ExpiredMessagesRouteModuleTest {
> private static final String CONSUMER_DESTINATION_ONE = "activemq:Consumer.monster.VirtualTopic.Foo.Integrations"
> private static final String CONSUMER_DESTINATION_TWO = "activemq:Consumer.monstermagnet.VirtualTopic.Foo.Integrations"
> private static int freePortBroker
> @Autowired
> private JmsTemplate jmsTemplate
> @Autowired
> private ModelCamelContext context
> @EndpointInject(uri = "mock:expiredMessages")
> private MockEndpoint expiredMessagesEndpointMock
> @BeforeClass
> public static void enableCamelJMX() {
> System.clearProperty(JmxSystemPropertyKeys.DISABLED)
> freePortBroker = PortSelector.random().select()
> writeProperty("foo-broker.port", "" + freePortBroker)
> }
> @AfterClass
> public static void tearDown() {
> resetProperty("foo-broker.port")
> }
> @Before
> public void replaceBeanWithMockEndpoint() throws Exception {
> context.getRouteDefinition(INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY.name()).adviceWith(context, new AdviceWithRouteBuilder() {
> @Override
> public void configure() throws Exception {
> weaveByType(BeanDefinition).replace().to(expiredMessagesEndpointMock)
> }
> });
> context.start()
> }
> @Test(timeout = 10000L)
> public void consume_from_expired_messages_advisory_topic() throws Exception {
> createTwoTopicConsumers()
> setUpMessagesToExpireFast()
> jmsTemplate.convertAndSend(INCOMING.destination, [SEARCH_ID_KEY: "1"])
> waitForMessageToExpire()
> expiredMessagesEndpointMock.expectedMessageCount(2)
> expiredMessagesEndpointMock.assertIsSatisfied()
> }
> private void setUpMessagesToExpireFast() {
> jmsTemplate.setExplicitQosEnabled(true)
> jmsTemplate.setTimeToLive(500L)
> }
> // Need to create topic consumer, otherwise the messages sent to the topic will be discarded before they expire.
> private void createTwoTopicConsumers() {
> context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_ONE)
> context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_TWO)
> }
> def waitForMessageToExpire() {
> Thread.sleep(1000)
> }
> }
> {code}
> and here's the exception
> {code}
> no.foo.integration.module.ExpiredMessagesRouteModuleTest > consume_from_expired_messages_advisory_topic FAILED
> org.junit.runners.model.TestTimedOutException: test timed out after 10000 milliseconds
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
> at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1425)
> at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1409)
> at org.apache.camel.component.mock.MockEndpoint.doAssertIsSatisfied(MockEndpoint.java:405)
> at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:386)
> at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:374)
> at org.apache.camel.component.mock.MockEndpoint$assertIsSatisfied$0.call(Unknown Source)
> at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
> at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
> at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
> at no.foo.integration.module.ExpiredMessagesRouteModuleTest.consume_from_expired_messages_advisory_topic(ExpiredMessagesRouteModuleTest.groovy:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
> at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
> at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
> at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)