You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by al94781 <an...@harmel-law.com> on 2013/05/20 14:04:16 UTC

Not Expiring JMS Messages with ActiveMQ / Camel

Hi there,

I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading
ExchangePattern.InOnly messages from a JMS queue, and want to expire those
which are not processed within a given time explicitly to a named dead
letter queue.  The problem is I can't get things to expire.

I have the following route:

public class FulfillmentRequestRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

       
errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
       
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true")
            .transacted()
            .to("mock:initialProcessor");
    }
}

And the following ActiveMQ config:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:broker="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
                           http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
             
    
    <broker:broker useJmx="true" persistent="true" brokerName="myBroker">
        <broker:transportConnectors>
            
            <broker:transportConnector name="vm" uri="vm://myBroker" />
            
            <broker:transportConnector name="tcp"
uri="tcp://localhost:${tcp.port}" />
        </broker:transportConnectors>
        <broker:persistenceAdapter>
            <broker:kahaPersistenceAdapter
directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
        </broker:persistenceAdapter>
        <broker:destinationPolicy>
            <broker:policyMap>
              <broker:policyEntries>
                
                <broker:policyEntry queue=">">
                  <broker:deadLetterStrategy>
                    <broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
                  </broker:deadLetterStrategy>
                </broker:policyEntry>
              </broker:policyEntries>
            </broker:policyMap>
        </broker:destinationPolicy>
    </broker:broker>

    
    
    <bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="brokerURL" value="vm://myBroker" />
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jmsTransactionManager"/>
        <property name="acceptMessagesWhileStopping" value="false"/>
    </bean>
    <bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>
    <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://myBroker" />
    </bean>
         
</beans>

Finally I have a Unit Test which creates two messages,one which will be
processed, and the other which should time-out.

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations =
{"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {

    @EndpointInject(uri = "mock:initialProcessor")
    protected MockEndpoint mockEndpoint;

    @Produce
    protected ProducerTemplate template;

    protected ConsumerTemplate consumer;

    @Autowired
    @Qualifier("camel-server")
    protected CamelContext context;

    @DirtiesContext
    @Test
    public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws
Exception {

        // Given
        consumer = context.createConsumerTemplate();

        int expectedValidMessageCount = 3;
        mockEndpoint.expectedMessageCount(expectedValidMessageCount);        

        // When 
        String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT
TIMEOUT</body>";
        template.sendBody("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody1);

        long ttl = System.currentTimeMillis() - 12000000;
        String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED
OUT!!!!!</body>";
        template.sendBodyAndHeader("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);

        // Then
        // The second message is not processed
        mockEndpoint.assertIsSatisfied(); // This fails, but it sees two
messages rather than just one

        List<Exchange> list = mockEndpoint.getReceivedExchanges();
        String notTimedOutMessageBody = (String)
list.get(0).getIn().getBody(String.class);

        assertEquals(xmlBody1, notTimedOutMessageBody);

        Thread.sleep(5000);

        // And is instead routed to the timedOut JMS queue
        Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
        assertNotNull("Should not lose the message", dlqBody);          //
This also fails if I comment out the assert above
        assertEquals(xmlBody2, dlqBody);
    }

    @Configuration
    public static class ContextConfig extends SingleRouteCamelConfiguration
{

        @Bean
        public RouteBuilder route() {
            return new FulfillmentRequestRoute();
        }
    }
}

I've been staring at this for a while, and while I think I've only been
changing one thing at a time, I may have made an error or left behind some
config which is shooting me in the foot.

One final thing to note, I have this pattern working elsewhere in tests
which explicitly throw exceptions from with transactions in Camel, but I'd
prefer not to have to manually start looking into headers myself when this
all seems to be handled already.

I hope you can help.

TIA

Cheers, Andrew



--
View this message in context: http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Not Expiring JMS Messages with ActiveMQ / Camel

Posted by al94781 <an...@harmel-law.com>.
Brilliant.  Thanks very much @ceposta.  I'll try it now and post back
(hopefully with a )

Cheers, Andrew



--
View this message in context: http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841p5732900.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Not Expiring JMS Messages with ActiveMQ / Camel

Posted by Christian Posta <ch...@gmail.com>.
Andrew,
Maybe try setting your prefetch to 1 to see if the rest of the messages
expire. Prefetch is basically a batch delivery to the consumer, with 1000
messages being default for queues. If messages are prefetched to the
consumer then they cannot be expired (unless should have already been
considered expired before dispatching to the consumer). Not sure which is
your case. To change the prefetch settings, try using
vm://localhost?jms.prefetchPolicy.queuePrefetch=1 in your connection string
to the connection factory.

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

You could also debug a little further by turning off the camel route,
sending messages to the queue, and waiting to see if activemq does expire
the messages as you're expecting.

Hope that helps...



On Tue, May 21, 2013 at 2:56 AM, al94781 <an...@harmel-law.com> wrote:

> OK, as I posted earlier, I can see the broker trying to expire messages,
> but
> nothing seems to expire. Is it because things as listed as "inflight"? (See
> below)  I'm aware that my unit test harness might not be entirely
> realistic.
> Can anyone point out anything dumb I'm doing?
>
> The inbound message has "expiration = 2" and "persistent = true".  I notice
> however that expiration has become "JMSExpiration=0" further on in
> processing which might be problematic. (I have "preserveMessageQos=true"
> set
> on the JMS uri:
>
> jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true)
>
> The following is the detail from my logs:
> 2013-05-21 09:44:07,289 [main           ] DEBUG ProducerCache
> - >>>> Endpoint[jms://queue:fulfillmentRequest] Exchange[Message: <?xml
> version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>]
> 2013-05-21 09:44:07,290 [main           ] DEBUG
> Configuration$CamelJmsTemplate - Executing callback on JMS Session:
> PooledSession { ActiveMQSession
> {id=ID:LTPW764AHARMEL-65261-1369125844695-9:1:1,started=false} }
> 2013-05-21 09:44:07,290 [main           ] DEBUG JmsConfiguration
> - Sending JMS message to: queue://fulfillmentRequest with message:
> ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId =
> null, originalDestination = null, originalTransactionId = null, producerId
> =
> null, destination = null, transactionId = null, expiration = 2, timestamp =
> 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null,
> replyTo = null, persistent = true, type = null, priority = 0, groupID =
> null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> = null, content = null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter = 0, size = 0, properties =
> {breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5},
> readOnlyProperties
> = false, readOnlyBody = false, droppable = false, text = <?xml
> version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}
> 2013-05-21 09:44:07,290 [main           ] DEBUG TransactionContext
> - Begin:TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3
> 2013-05-21 09:44:07,290 [main           ] DEBUG ActiveMQSession
> - ID:LTPW764AHARMEL-65261-1369125844695-9:1:1 Transaction Commit
> :TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3
> 2013-05-21 09:44:07,291 [main           ] DEBUG TransactionContext
> - Commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3 syncCount: 0
> 2013-05-21 09:44:07,291 [://myBroker#9-2] DEBUG LocalTransaction
> - commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3 syncCount: 2
> 2013-05-21 09:44:07,292 [://myBroker#9-2] DEBUG Queue
> - myBroker Message ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 sent to
> queue://fulfillmentRequest
> 2013-05-21 09:44:07,292 [yBroker] Task-2] DEBUG Queue
> - fulfillmentRequest toPageIn: 1, Inflight: 2, pagedInMessages.size 2,
> enqueueCount: 3, dequeueCount: 0
> 2013-05-21 09:44:07,292 [main           ] DEBUG ActiveMQSession
> - ID:LTPW764AHARMEL-65261-1369125844695-9:1:1 Transaction Rollback,
> txid:null
> 2013-05-21 09:44:07,292 [main           ] INFO  MockEndpoint
> - Asserting: Endpoint[mock://initialProcessor] is satisfied
> 2013-05-21 09:44:07,292 [main           ] DEBUG MockEndpoint
> - Waiting on the latch for: 0 millis
> 2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue
> - queue://fulfillmentRequest expiring messages ..
> 2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue
> - fulfillmentRequest toPageIn: 0, Inflight: 3, pagedInMessages.size 3,
> enqueueCount: 3, dequeueCount: 0
> 2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue
> - queue://fulfillmentRequest expiring messages done.
> 2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue
> - queue://fulfillmentRequest expiring messages ..
> 2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue
> - fulfillmentRequest toPageIn: 0, Inflight: 3, pagedInMessages.size 3,
> enqueueCount: 3, dequeueCount: 0
> 2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue
> - queue://fulfillmentRequest expiring messages done.
>
> ... <snip>
>
> 2013-05-21 09:44:07,693 [illmentRequest]] DEBUG EndpointMessageListener
> -
>
> Endpoint[jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true]
> consumer received JMS message: ActiveMQTextMessage {commandId = 12,
> responseRequired = false, messageId =
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3, originalDestination =
> null,
> originalTransactionId = null, producerId =
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1, destination =
> queue://fulfillmentRequest, transactionId =
> TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3, expiration = 0, timestamp =
> 1369125847290, arrival = 0, brokerInTime = 1369125847291, brokerOutTime =
> 1369125847693, correlationId = null, replyTo = null, persistent = true,
> type
> = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId =
> null, compressed = false, userID = null, content =
> org.apache.activemq.util.ByteSequence@4f2a858f, marshalledProperties =
> org.apache.activemq.util.ByteSequence@70a697e3, dataStructure = null,
> redeliveryCounter = 0, size = 1149, properties =
> {breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5},
> readOnlyProperties
> = true, readOnlyBody = true, droppable = false, text = <?xml
> version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}
>
>
> ... <snip>
>
> 13-05-21 09:44:07,692 [://myBroker#7-2] DEBUG Queue
> - queue://fulfillmentRequest add sub: QueueSubscription:
> consumer=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3, destinations=0,
> dispatched=0, delivered=0, pending=0, dequeues: 2, dispatched: 2, inflight:
> 0
> 2013-05-21 09:44:07,692 [yBroker] Task-1] DEBUG Queue
> - fulfillmentRequest toPageIn: 0, Inflight: 0, pagedInMessages.size 1,
> enqueueCount: 3, dequeueCount: 2
> 2013-05-21 09:44:07,693 [illmentRequest]] DEBUG TransactionContext
> - Begin:TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3
> 2013-05-21 09:44:07,693 [illmentRequest]] DEBUG
> ultJmsMessageListenerContainer - Received message of type [class
> org.apache.activemq.command.ActiveMQTextMessage] from consumer
> [PooledMessageConsumer { ActiveMQMessageConsumer {
> value=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3, started=true } }] of
> session [PooledSession { ActiveMQSession
> {id=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1,started=true} }]
> 2013-05-21 09:44:07,693 [illmentRequest]] DEBUG EndpointMessageListener
> -
>
> Endpoint[jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true]
> consumer received JMS message: ActiveMQTextMessage {commandId = 12,
> responseRequired = false, messageId =
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3, originalDestination =
> null,
> originalTransactionId = null, producerId =
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1, destination =
> queue://fulfillmentRequest, transactionId =
> TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3, expiration = 0, timestamp =
> 1369125847290, arrival = 0, brokerInTime = 1369125847291, brokerOutTime =
> 1369125847693, correlationId = null, replyTo = null, persistent = true,
> type
> = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId =
> null, compressed = false, userID = null, content =
> org.apache.activemq.util.ByteSequence@4f2a858f, marshalledProperties =
> org.apache.activemq.util.ByteSequence@70a697e3, dataStructure = null,
> redeliveryCounter = 0, size = 1149, properties =
> {breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5},
> readOnlyProperties
> = true, readOnlyBody = true, droppable = false, text = <?xml
> version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionErrorHandler
> - Transaction begin (0x3ab8dd3f) redelivered(false) for (MessageId:
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 on ExchangeId:
> ID-LTPW764AHARMEL-65263-1369125845761-0-9))
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG SendProcessor
> - >>>> Endpoint[mock://initialProcessor] Exchange[JmsMessage[JmsMessageID:
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3]]
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG MockEndpoint
> - mock://initialProcessor >>>> 2 : Exchange[JmsMessage[JmsMessageID:
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3]] with body: <?xml
> version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body> and
> headers:{JMSDestination=queue://fulfillmentRequest, JMSXGroupID=null,
> JMSPriority=4,
> JMSMessageID=ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3,
> JMSExpiration=0, JMSReplyTo=null,
> breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5,
> JMSRedelivered=false, JMSDeliveryMode=2, JMSCorrelationID=null,
> JMSType=null, JMSTimestamp=1369125847290}
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionErrorHandler
> - Transaction commit (0x3ab8dd3f) redelivered(false) for (MessageId:
> ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 on ExchangeId:
> ID-LTPW764AHARMEL-65263-1369125845761-0-9))
> 2013-05-21 09:44:07,694 [main           ] DEBUG MockEndpoint
> - Took 402 millis to complete latch
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG ActiveMQSession
> - ID:LTPW764AHARMEL-65261-1369125844695-7:1:1 Transaction Commit
> :TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3
> 2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionContext
> - Commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3 syncCount: 1
> 2013-05-21 09:44:07,695 [://myBroker#7-2] DEBUG LocalTransaction
> - commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3 syncCount: 2
> 2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQMessageConsumer
> - remove: ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3,
> lastDeliveredSequenceId:11
> 2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQSession
> - ID:LTPW764AHARMEL-65261-1369125844695-7:1:1 Transaction Rollback,
> txid:null
> 2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQSession
> - ID:LTPW764AHARMEL-65261-1369125844695-3:4:1 Transaction Commit :null
>
> I hope this makes sense and that someone can help.
>
> TIA
>
> Cheers, Andrew
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841p5732881.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: Not Expiring JMS Messages with ActiveMQ / Camel

Posted by al94781 <an...@harmel-law.com>.
OK, as I posted earlier, I can see the broker trying to expire messages, but
nothing seems to expire. Is it because things as listed as "inflight"? (See
below)  I'm aware that my unit test harness might not be entirely realistic. 
Can anyone point out anything dumb I'm doing?

The inbound message has "expiration = 2" and "persistent = true".  I notice
however that expiration has become "JMSExpiration=0" further on in
processing which might be problematic. (I have "preserveMessageQos=true" set
on the JMS uri:
jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true)

The following is the detail from my logs:
2013-05-21 09:44:07,289 [main           ] DEBUG ProducerCache                 
- >>>> Endpoint[jms://queue:fulfillmentRequest] Exchange[Message: <?xml
version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>]
2013-05-21 09:44:07,290 [main           ] DEBUG
Configuration$CamelJmsTemplate - Executing callback on JMS Session:
PooledSession { ActiveMQSession
{id=ID:LTPW764AHARMEL-65261-1369125844695-9:1:1,started=false} }
2013-05-21 09:44:07,290 [main           ] DEBUG JmsConfiguration              
- Sending JMS message to: queue://fulfillmentRequest with message:
ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId =
null, originalDestination = null, originalTransactionId = null, producerId =
null, destination = null, transactionId = null, expiration = 2, timestamp =
0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null,
replyTo = null, persistent = true, type = null, priority = 0, groupID =
null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure = null,
redeliveryCounter = 0, size = 0, properties =
{breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5}, readOnlyProperties
= false, readOnlyBody = false, droppable = false, text = <?xml
version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}
2013-05-21 09:44:07,290 [main           ] DEBUG TransactionContext            
- Begin:TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3
2013-05-21 09:44:07,290 [main           ] DEBUG ActiveMQSession               
- ID:LTPW764AHARMEL-65261-1369125844695-9:1:1 Transaction Commit
:TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3
2013-05-21 09:44:07,291 [main           ] DEBUG TransactionContext            
- Commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3 syncCount: 0
2013-05-21 09:44:07,291 [://myBroker#9-2] DEBUG LocalTransaction              
- commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3 syncCount: 2
2013-05-21 09:44:07,292 [://myBroker#9-2] DEBUG Queue                         
- myBroker Message ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 sent to
queue://fulfillmentRequest
2013-05-21 09:44:07,292 [yBroker] Task-2] DEBUG Queue                         
- fulfillmentRequest toPageIn: 1, Inflight: 2, pagedInMessages.size 2,
enqueueCount: 3, dequeueCount: 0
2013-05-21 09:44:07,292 [main           ] DEBUG ActiveMQSession               
- ID:LTPW764AHARMEL-65261-1369125844695-9:1:1 Transaction Rollback,
txid:null
2013-05-21 09:44:07,292 [main           ] INFO  MockEndpoint                  
- Asserting: Endpoint[mock://initialProcessor] is satisfied
2013-05-21 09:44:07,292 [main           ] DEBUG MockEndpoint                  
- Waiting on the latch for: 0 millis
2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue                         
- queue://fulfillmentRequest expiring messages ..
2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue                         
- fulfillmentRequest toPageIn: 0, Inflight: 3, pagedInMessages.size 3,
enqueueCount: 3, dequeueCount: 0
2013-05-21 09:44:07,316 [oker] Scheduler] DEBUG Queue                         
- queue://fulfillmentRequest expiring messages done.
2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue                         
- queue://fulfillmentRequest expiring messages ..
2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue                         
- fulfillmentRequest toPageIn: 0, Inflight: 3, pagedInMessages.size 3,
enqueueCount: 3, dequeueCount: 0
2013-05-21 09:44:07,317 [oker] Scheduler] DEBUG Queue                         
- queue://fulfillmentRequest expiring messages done.

... <snip>

2013-05-21 09:44:07,693 [illmentRequest]] DEBUG EndpointMessageListener       
-
Endpoint[jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true]
consumer received JMS message: ActiveMQTextMessage {commandId = 12,
responseRequired = false, messageId =
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3, originalDestination = null,
originalTransactionId = null, producerId =
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1, destination =
queue://fulfillmentRequest, transactionId =
TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3, expiration = 0, timestamp =
1369125847290, arrival = 0, brokerInTime = 1369125847291, brokerOutTime =
1369125847693, correlationId = null, replyTo = null, persistent = true, type
= null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId =
null, compressed = false, userID = null, content =
org.apache.activemq.util.ByteSequence@4f2a858f, marshalledProperties =
org.apache.activemq.util.ByteSequence@70a697e3, dataStructure = null,
redeliveryCounter = 0, size = 1149, properties =
{breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5}, readOnlyProperties
= true, readOnlyBody = true, droppable = false, text = <?xml
version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}


... <snip>

13-05-21 09:44:07,692 [://myBroker#7-2] DEBUG Queue                         
- queue://fulfillmentRequest add sub: QueueSubscription:
consumer=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3, destinations=0,
dispatched=0, delivered=0, pending=0, dequeues: 2, dispatched: 2, inflight:
0
2013-05-21 09:44:07,692 [yBroker] Task-1] DEBUG Queue                         
- fulfillmentRequest toPageIn: 0, Inflight: 0, pagedInMessages.size 1,
enqueueCount: 3, dequeueCount: 2
2013-05-21 09:44:07,693 [illmentRequest]] DEBUG TransactionContext            
- Begin:TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3
2013-05-21 09:44:07,693 [illmentRequest]] DEBUG
ultJmsMessageListenerContainer - Received message of type [class
org.apache.activemq.command.ActiveMQTextMessage] from consumer
[PooledMessageConsumer { ActiveMQMessageConsumer {
value=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3, started=true } }] of
session [PooledSession { ActiveMQSession
{id=ID:LTPW764AHARMEL-65261-1369125844695-7:1:1,started=true} }]
2013-05-21 09:44:07,693 [illmentRequest]] DEBUG EndpointMessageListener       
-
Endpoint[jms://queue:fulfillmentRequest?explicitQosEnabled=true&preserveMessageQos=true&timeToLive=1000&transacted=true]
consumer received JMS message: ActiveMQTextMessage {commandId = 12,
responseRequired = false, messageId =
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3, originalDestination = null,
originalTransactionId = null, producerId =
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1, destination =
queue://fulfillmentRequest, transactionId =
TX:ID:LTPW764AHARMEL-65261-1369125844695-9:1:3, expiration = 0, timestamp =
1369125847290, arrival = 0, brokerInTime = 1369125847291, brokerOutTime =
1369125847693, correlationId = null, replyTo = null, persistent = true, type
= null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId =
null, compressed = false, userID = null, content =
org.apache.activemq.util.ByteSequence@4f2a858f, marshalledProperties =
org.apache.activemq.util.ByteSequence@70a697e3, dataStructure = null,
redeliveryCounter = 0, size = 1149, properties =
{breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5}, readOnlyProperties
= true, readOnlyBody = true, droppable = false, text = <?xml
version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body>}
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionErrorHandler       
- Transaction begin (0x3ab8dd3f) redelivered(false) for (MessageId:
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 on ExchangeId:
ID-LTPW764AHARMEL-65263-1369125845761-0-9))
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG SendProcessor                 
- >>>> Endpoint[mock://initialProcessor] Exchange[JmsMessage[JmsMessageID:
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3]]
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG MockEndpoint                  
- mock://initialProcessor >>>> 2 : Exchange[JmsMessage[JmsMessageID:
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3]] with body: <?xml
version="1.0"?><body>!!!!!TIMED OUT AGAIN!!!!!</body> and
headers:{JMSDestination=queue://fulfillmentRequest, JMSXGroupID=null,
JMSPriority=4, JMSMessageID=ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3,
JMSExpiration=0, JMSReplyTo=null,
breadcrumbId=ID-LTPW764AHARMEL-65263-1369125845761-0-5,
JMSRedelivered=false, JMSDeliveryMode=2, JMSCorrelationID=null,
JMSType=null, JMSTimestamp=1369125847290}
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionErrorHandler       
- Transaction commit (0x3ab8dd3f) redelivered(false) for (MessageId:
ID:LTPW764AHARMEL-65261-1369125844695-9:1:1:1:3 on ExchangeId:
ID-LTPW764AHARMEL-65263-1369125845761-0-9))
2013-05-21 09:44:07,694 [main           ] DEBUG MockEndpoint                  
- Took 402 millis to complete latch
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG ActiveMQSession               
- ID:LTPW764AHARMEL-65261-1369125844695-7:1:1 Transaction Commit
:TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3
2013-05-21 09:44:07,694 [illmentRequest]] DEBUG TransactionContext            
- Commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3 syncCount: 1
2013-05-21 09:44:07,695 [://myBroker#7-2] DEBUG LocalTransaction              
- commit: TX:ID:LTPW764AHARMEL-65261-1369125844695-7:1:3 syncCount: 2
2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQMessageConsumer       
- remove: ID:LTPW764AHARMEL-65261-1369125844695-7:1:1:3,
lastDeliveredSequenceId:11
2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQSession               
- ID:LTPW764AHARMEL-65261-1369125844695-7:1:1 Transaction Rollback,
txid:null
2013-05-21 09:44:07,695 [illmentRequest]] DEBUG ActiveMQSession               
- ID:LTPW764AHARMEL-65261-1369125844695-3:4:1 Transaction Commit :null

I hope this makes sense and that someone can help.

TIA

Cheers, Andrew



--
View this message in context: http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841p5732881.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Not Expiring JMS Messages with ActiveMQ / Camel

Posted by al94781 <an...@harmel-law.com>.
Hi @ceposta,

I've just put that line in my config and I can now see ActiveMQ saying
"queue://fulfillmentRequest expiring messages ...".  My test is still
failing, but it's probably because I'm being an idiot somewhere.  Thanks for
your help.

I'll post back the eventual outcome....

Cheers, Andrew



--
View this message in context: http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841p5732845.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Not Expiring JMS Messages with ActiveMQ / Camel

Posted by Christian Posta <ch...@gmail.com>.
It seems you're expecting the broker to expire the messages?
By default the expiration task for a queue will run every 30s.. if your
test is taking less time than that, you'll want to tune the broker to check
for expiration a little sooner:

<policyEntry .... expireMessagesPeriod="1s" />


On Mon, May 20, 2013 at 5:04 AM, al94781 <an...@harmel-law.com> wrote:

> Hi there,
>
> I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading
> ExchangePattern.InOnly messages from a JMS queue, and want to expire those
> which are not processed within a given time explicitly to a named dead
> letter queue.  The problem is I can't get things to expire.
>
> I have the following route:
>
> public class FulfillmentRequestRoute extends RouteBuilder {
>
>     @Override
>     public void configure() throws Exception {
>
>
> errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
>
>
> from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true")
>             .transacted()
>             .to("mock:initialProcessor");
>     }
> }
>
> And the following ActiveMQ config:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>        xmlns:broker="http://activemq.apache.org/schema/core"
>        xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
>                            http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
>
>
>     <broker:broker useJmx="true" persistent="true" brokerName="myBroker">
>         <broker:transportConnectors>
>
>             <broker:transportConnector name="vm" uri="vm://myBroker" />
>
>             <broker:transportConnector name="tcp"
> uri="tcp://localhost:${tcp.port}" />
>         </broker:transportConnectors>
>         <broker:persistenceAdapter>
>             <broker:kahaPersistenceAdapter
> directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
>         </broker:persistenceAdapter>
>         <broker:destinationPolicy>
>             <broker:policyMap>
>               <broker:policyEntries>
>
>                 <broker:policyEntry queue=">">
>                   <broker:deadLetterStrategy>
>                     <broker:sharedDeadLetterStrategy processExpired="true"
> processNonPersistent="true" />
>                   </broker:deadLetterStrategy>
>                 </broker:policyEntry>
>               </broker:policyEntries>
>             </broker:policyMap>
>         </broker:destinationPolicy>
>     </broker:broker>
>
>
>
>     <bean id="jms"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>         <property name="brokerURL" value="vm://myBroker" />
>         <property name="transacted" value="true"/>
>         <property name="transactionManager" ref="jmsTransactionManager"/>
>         <property name="acceptMessagesWhileStopping" value="false"/>
>     </bean>
>     <bean id="jmsTransactionManager"
> class="org.springframework.jms.connection.JmsTransactionManager">
>         <property name="connectionFactory" ref="jmsConnectionFactory"/>
>     </bean>
>     <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>         <property name="brokerURL" value="vm://myBroker" />
>     </bean>
>
> </beans>
>
> Finally I have a Unit Test which creates two messages,one which will be
> processed, and the other which should time-out.
>
> @RunWith(CamelSpringJUnit4ClassRunner.class)
> @ContextConfiguration(locations =
> {"classpath:/META-INF/spring/camel-server.xml"})
> public class FulfillmentRequestTimeoutTest {
>
>     @EndpointInject(uri = "mock:initialProcessor")
>     protected MockEndpoint mockEndpoint;
>
>     @Produce
>     protected ProducerTemplate template;
>
>     protected ConsumerTemplate consumer;
>
>     @Autowired
>     @Qualifier("camel-server")
>     protected CamelContext context;
>
>     @DirtiesContext
>     @Test
>     public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws
> Exception {
>
>         // Given
>         consumer = context.createConsumerTemplate();
>
>         int expectedValidMessageCount = 3;
>         mockEndpoint.expectedMessageCount(expectedValidMessageCount);
>
>         // When
>         String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT
> TIMEOUT</body>";
>         template.sendBody("jms:queue:fulfillmentRequest",
> ExchangePattern.InOnly, xmlBody1);
>
>         long ttl = System.currentTimeMillis() - 12000000;
>         String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED
> OUT!!!!!</body>";
>         template.sendBodyAndHeader("jms:queue:fulfillmentRequest",
> ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);
>
>         // Then
>         // The second message is not processed
>         mockEndpoint.assertIsSatisfied(); // This fails, but it sees two
> messages rather than just one
>
>         List<Exchange> list = mockEndpoint.getReceivedExchanges();
>         String notTimedOutMessageBody = (String)
> list.get(0).getIn().getBody(String.class);
>
>         assertEquals(xmlBody1, notTimedOutMessageBody);
>
>         Thread.sleep(5000);
>
>         // And is instead routed to the timedOut JMS queue
>         Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
>         assertNotNull("Should not lose the message", dlqBody);          //
> This also fails if I comment out the assert above
>         assertEquals(xmlBody2, dlqBody);
>     }
>
>     @Configuration
>     public static class ContextConfig extends SingleRouteCamelConfiguration
> {
>
>         @Bean
>         public RouteBuilder route() {
>             return new FulfillmentRequestRoute();
>         }
>     }
> }
>
> I've been staring at this for a while, and while I think I've only been
> changing one thing at a time, I may have made an error or left behind some
> config which is shooting me in the foot.
>
> One final thing to note, I have this pattern working elsewhere in tests
> which explicitly throw exceptions from with transactions in Camel, but I'd
> prefer not to have to manually start looking into headers myself when this
> all seems to be handled already.
>
> I hope you can help.
>
> TIA
>
> Cheers, Andrew
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta