You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by wing-tung Leung <wi...@gmail.com> on 2012/01/05 15:21:44 UTC

keep track of JMSMessageID with InOnly exchange

We have quite some routes which end with a simple fire and forget
towards a JMS queue. In general, this works fine.

But there is one more thing we would like to do afterwards, and that's
capture and log the JMS message ID in some audit table. For the
logging, we can use some custom bean, but the problem is: HOW can we
get grip on the message ID, which is assigned by the JMS provider (in
this case WebSphere MQ) ?

When looking into the code from JmsProducer.setMessageId(), the MQ
message ID is only available in request-reply configuration, and not
with the "InOnly" case. Is there some "easy" way to capture the ID
from the external JMS provider? Or is the only alternative using self
generated message ID's ?

(Camel 2.6.0 on WebSphere AS 6.1 and WebSphere MQ)

Many thanks in advance!

Tung

Re: keep track of JMSMessageID with InOnly exchange

Posted by dmhatre <da...@gmail.com>.
Hi there,
    So did you guys found out the way how to implement this.
I tried the patch but i get the following error:
Also pasting my configuration and java code. Please help.

java.lang.UnsupportedOperationException: JMSCC0029: A destination must be
specified when sending from this producer.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)[:1.6.0_32]
	at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)[:1.6.0_32]
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)[:1.6.0_32]
	at
java.lang.reflect.Constructor.newInstance(Constructor.java:513)[:1.6.0_32]
	at
com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:313)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:390)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:104)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.checkNotUnidentifiedProducer(JmsMessageProducerImpl.java:1058)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send_(JmsMessageProducerImpl.java:725)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:406)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:299)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:274)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:213)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$1(JmsConfiguration.java:203)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$2.doInJms(JmsConfiguration.java:179)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:177)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:375)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:285)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:81)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:50)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.DefaultScheduledPollConsumer.poll(DefaultScheduledPollConsumer.java:64)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:138)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
	at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_32]
	at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)[:1.6.0_32]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
	at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]

route
=================================================
 	<route>
	<from uri="bean:readyOutgoingMessageSql?method=getRawMessage"/>
	<to uri="bean:readyOutgoingMessageSql?method=getMessageFormat"/>
	<process ref="readyOutgoingMessageProcessor"/>
	<choice>
	<when>
		<simple>${in.header.mqDefId} == '1'</simple>
                	<to uri="dtccPol-JMS:queue:LQ_SPO"/>
                	<to uri="bean:printer?method=printFileNames" />
	</when>					
	<when>
		<simple>${in.header.mqDefId} == '2'</simple>
                	<to uri="dtccDox-JMS:queue:LQ_DOX"/>
                	<to uri="bean:printer?method=printFileNames" />
	</when>
	<otherwise>
                	<to uri="bean:printer?method=printFileNames" />
            	</otherwise>
	</choice>	
	</route> 

Processor 
=======================================
public class OutgoingMessageProcessor  implements Processor {
	
	private static Logger log =
Logger.getLogger(OutgoingMessageProcessor.class);

	@Override
	public void process(Exchange ex) throws Exception {
		
		if(ex.getIn().getBody()!=null){
			Map<String, Object> m = ex.getProperties();
			WrappedMessageModel wm = (WrappedMessageModel)ex.getIn().getBody();
			
			if(wm.getMessage()!=null && wm.getMessageFormat()!=null){

				ex.setPattern(ExchangePattern.InOnly);			
				ex.getIn().setHeader("mqDefId", wm.getMessageFormat().getMqDefId());	
				
				ex.getIn().setBody(wm.getMessage().getMessage());
			}
		}			
	}
}

where get message return byte[]

JMS Configuration
===========================================

<bean id="dtccDoxConnectionFactory"
class="com.ibm.mq.jms.MQXAQueueConnectionFactory">
		<property name="hostName" value="127.0.0.1" />
		<property name="port" value="1414" />
		<property name="queueManager" value="booradley" />
		<property name="channel" value="booradley" />
		<property name="transportType" value="1" />
	</bean>
	<bean id="atomikosDtccDoxConnectionFactory"
class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"
destroy-method="close"> 
  		<property name="xaConnectionFactory"><ref
bean="dtccDoxConnectionFactory"/></property>
  		<property name="uniqueResourceName" value="DTCC_DOX_JMS"/>
  		<property name="poolSize" value="50"/>
	</bean>
	<bean id="dtccDoxJmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
		<property name="connectionFactory" ref="atomikosDtccDoxConnectionFactory"
/>
		<property name="transactionManager" ref="JtaTransactionManager"/>
	</bean>
	<bean id="dtccDox-JMS" class="org.apache.camel.component.jms.JmsComponent">
		<property name="configuration" ref="dtccDoxJmsConfig" />
	</bean>

--
View this message in context: http://camel.465427.n5.nabble.com/keep-track-of-JMSMessageID-with-InOnly-exchange-tp5122775p5714569.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: keep track of JMSMessageID with InOnly exchange

Posted by wing-tung Leung <wi...@gmail.com>.
On Wed, Jan 18, 2012 at 4:53 PM, Claus Ibsen <cl...@gmail.com> wrote:
> A possible issue is that the JMSMessageID is only available when the
> JMS Client have really sent the message, and some clients can be in an
> async mode, where they may take a little while before sending. So in
> that case I would assume Camel will have to wait for that to happen.

Sorry to respond so late, lost track of it in the huge pile of camel
mailing  .. ;-)

I did not take into account that the sending could run asynchronously
for some JMS clients. You are right that retrieving the JMSMessageID
in that case could block the process a little.

> Unless the Spring Sent callback happens as a sort of pre construct
> step, and the JMS client is able to determine the JMSMessageID before
> actually sending it.

I'm afraid this is not clearly specified in the JMS spec, and we don't
have any guarantee. :-(

> Anyway its probably different a bit from JMS vendor to vendor. So a
> one shoe solution is maybe not possible.

Indeed.

> Also a JMS InOnly message, you most likely do not want to have the
> Message on the Camel Exchange changed into something else such as a
> javax.jms.Message, as you did a fire and forget. So maybe the patch
> should just enrich the message by adding a header with the
> JMSMessageID value.

Sounds reasonable. Code is written a while ago, I'm not sure, but
IIRC, I copied the full message to the output because it was easier
for logging both the JMSMessageID and the text message body. But the
body could be captured earlier of course, so having only the
JMSMessageID as header value should probably work fine as well. When I
have more time to revisit the code, I will check if I can refactor
both the patch and the route definition according to your suggestion.

> And I think there should be an option so people can turn this on|off, based on their needs.

Probably not hard to implement, based on some configuration option or
exchange property. If you have any preference, just let me know. Will
try to integrate into patch as well, but probably not very soon.

> BTW: What is your use case. You need to use the actual JMSMessageID
> for track and tracing or some sort?

Yes, merely for track and tracing. Our application fires messages to a
remote system via IBM MQ, and for audit and troubleshooting reasons,
we log the message IDs of our all the outgoing messages. We were
actually surprised Camel did not provide this out of the box, but
maybe we just have strange requirements .. ;-)

Re: keep track of JMSMessageID with InOnly exchange

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks for the patch.

A possible issue is that the JMSMessageID is only available when the
JMS Client have really sent the message, and some clients can be in an
async mode, where they may take a little while before sending. So in
that case I would assume Camel will have to wait for that to happen.

Unless the Spring Sent callback happens as a sort of pre construct
step, and the JMS client is able to determine the JMSMessageID before
actually sending it.

Anyway its probably different a bit from JMS vendor to vendor. So a
one shoe solution is maybe not possible.

Also a JMS InOnly message, you most likely do not want to have the
Message on the Camel Exchange changed into something else such as a
javax.jms.Message, as you did a fire and forget. So maybe the patch
should just enrich the message by adding a header with the
JMSMessageID value. And I think there should be an option so people
can turn this on|off, based on their needs.

BTW: What is your use case. You need to use the actual JMSMessageID
for track and tracing or some sort?


On Fri, Jan 6, 2012 at 2:18 PM, wing-tung Leung
<wi...@gmail.com> wrote:
> On Thu, Jan 5, 2012 at 3:21 PM, wing-tung Leung
> <wi...@gmail.com> wrote:
>> But there is one more thing we would like to do afterwards, and that's
>> capture and log the JMS message ID in some audit table.
>
> I finally patched the Camel JmsProducer class, with a minor
> modification of the "InOnly" behavior. In this case, the actually sent
> JMS message, containing the filled in "JMSMessageID", is copied to the
> "out" from the exchange.
>
> This enables processors further down the line to capture the message
> ID for logging.
>
> Attached the patch. Any comments/ideas to improve this further? Any
> idea if such idea could end up in the trunk? I applied it to version
> 2.6.0, but it can be applied to the current trunk without conflicts as
> well.
>
> Regards,
>
> Tung



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: keep track of JMSMessageID with InOnly exchange

Posted by wing-tung Leung <wi...@gmail.com>.
On Thu, Jan 5, 2012 at 3:21 PM, wing-tung Leung
<wi...@gmail.com> wrote:
> But there is one more thing we would like to do afterwards, and that's
> capture and log the JMS message ID in some audit table.

I finally patched the Camel JmsProducer class, with a minor
modification of the "InOnly" behavior. In this case, the actually sent
JMS message, containing the filled in "JMSMessageID", is copied to the
"out" from the exchange.

This enables processors further down the line to capture the message
ID for logging.

Attached the patch. Any comments/ideas to improve this further? Any
idea if such idea could end up in the trunk? I applied it to version
2.6.0, but it can be applied to the current trunk without conflicts as
well.

Regards,

Tung