You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by wing-tung Leung <wi...@gmail.com> on 2012/04/13 23:58:28 UTC

Re: keep track of JMSMessageID with InOnly exchange

On Wed, Jan 18, 2012 at 4:53 PM, Claus Ibsen <cl...@gmail.com> wrote:
> A possible issue is that the JMSMessageID is only available when the
> JMS Client have really sent the message, and some clients can be in an
> async mode, where they may take a little while before sending. So in
> that case I would assume Camel will have to wait for that to happen.

Sorry to respond so late, lost track of it in the huge pile of camel
mailing  .. ;-)

I did not take into account that the sending could run asynchronously
for some JMS clients. You are right that retrieving the JMSMessageID
in that case could block the process a little.

> Unless the Spring Sent callback happens as a sort of pre construct
> step, and the JMS client is able to determine the JMSMessageID before
> actually sending it.

I'm afraid this is not clearly specified in the JMS spec, and we don't
have any guarantee. :-(

> Anyway its probably different a bit from JMS vendor to vendor. So a
> one shoe solution is maybe not possible.

Indeed.

> Also a JMS InOnly message, you most likely do not want to have the
> Message on the Camel Exchange changed into something else such as a
> javax.jms.Message, as you did a fire and forget. So maybe the patch
> should just enrich the message by adding a header with the
> JMSMessageID value.

Sounds reasonable. Code is written a while ago, I'm not sure, but
IIRC, I copied the full message to the output because it was easier
for logging both the JMSMessageID and the text message body. But the
body could be captured earlier of course, so having only the
JMSMessageID as header value should probably work fine as well. When I
have more time to revisit the code, I will check if I can refactor
both the patch and the route definition according to your suggestion.

> And I think there should be an option so people can turn this on|off, based on their needs.

Probably not hard to implement, based on some configuration option or
exchange property. If you have any preference, just let me know. Will
try to integrate into patch as well, but probably not very soon.

> BTW: What is your use case. You need to use the actual JMSMessageID
> for track and tracing or some sort?

Yes, merely for track and tracing. Our application fires messages to a
remote system via IBM MQ, and for audit and troubleshooting reasons,
we log the message IDs of our all the outgoing messages. We were
actually surprised Camel did not provide this out of the box, but
maybe we just have strange requirements .. ;-)

Re: keep track of JMSMessageID with InOnly exchange

Posted by dmhatre <da...@gmail.com>.
Hi there,
    So did you guys found out the way how to implement this.
I tried the patch but i get the following error:
Also pasting my configuration and java code. Please help.

java.lang.UnsupportedOperationException: JMSCC0029: A destination must be
specified when sending from this producer.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)[:1.6.0_32]
	at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)[:1.6.0_32]
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)[:1.6.0_32]
	at
java.lang.reflect.Constructor.newInstance(Constructor.java:513)[:1.6.0_32]
	at
com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:313)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:390)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:104)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.checkNotUnidentifiedProducer(JmsMessageProducerImpl.java:1058)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send_(JmsMessageProducerImpl.java:725)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:406)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:299)[com.ibm.mqjms.jar:7.0.1.0
- k000-L090724]
	at
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:274)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:213)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$1(JmsConfiguration.java:203)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$2.doInJms(JmsConfiguration.java:179)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE]
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:177)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:375)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)[file:/C:/workspace/OceanviewEAI/bin/:]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:285)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:81)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:50)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.DefaultScheduledPollConsumer.poll(DefaultScheduledPollConsumer.java:64)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:138)[camel-core-2.9.2.jar:2.9.2]
	at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:90)[camel-core-2.9.2.jar:2.9.2]
	at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
	at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_32]
	at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)[:1.6.0_32]
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)[:1.6.0_32]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
	at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]

route
=================================================
 	<route>
	<from uri="bean:readyOutgoingMessageSql?method=getRawMessage"/>
	<to uri="bean:readyOutgoingMessageSql?method=getMessageFormat"/>
	<process ref="readyOutgoingMessageProcessor"/>
	<choice>
	<when>
		<simple>${in.header.mqDefId} == '1'</simple>
                	<to uri="dtccPol-JMS:queue:LQ_SPO"/>
                	<to uri="bean:printer?method=printFileNames" />
	</when>					
	<when>
		<simple>${in.header.mqDefId} == '2'</simple>
                	<to uri="dtccDox-JMS:queue:LQ_DOX"/>
                	<to uri="bean:printer?method=printFileNames" />
	</when>
	<otherwise>
                	<to uri="bean:printer?method=printFileNames" />
            	</otherwise>
	</choice>	
	</route> 

Processor 
=======================================
public class OutgoingMessageProcessor  implements Processor {
	
	private static Logger log =
Logger.getLogger(OutgoingMessageProcessor.class);

	@Override
	public void process(Exchange ex) throws Exception {
		
		if(ex.getIn().getBody()!=null){
			Map<String, Object> m = ex.getProperties();
			WrappedMessageModel wm = (WrappedMessageModel)ex.getIn().getBody();
			
			if(wm.getMessage()!=null && wm.getMessageFormat()!=null){

				ex.setPattern(ExchangePattern.InOnly);			
				ex.getIn().setHeader("mqDefId", wm.getMessageFormat().getMqDefId());	
				
				ex.getIn().setBody(wm.getMessage().getMessage());
			}
		}			
	}
}

where get message return byte[]

JMS Configuration
===========================================

<bean id="dtccDoxConnectionFactory"
class="com.ibm.mq.jms.MQXAQueueConnectionFactory">
		<property name="hostName" value="127.0.0.1" />
		<property name="port" value="1414" />
		<property name="queueManager" value="booradley" />
		<property name="channel" value="booradley" />
		<property name="transportType" value="1" />
	</bean>
	<bean id="atomikosDtccDoxConnectionFactory"
class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"
destroy-method="close"> 
  		<property name="xaConnectionFactory"><ref
bean="dtccDoxConnectionFactory"/></property>
  		<property name="uniqueResourceName" value="DTCC_DOX_JMS"/>
  		<property name="poolSize" value="50"/>
	</bean>
	<bean id="dtccDoxJmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
		<property name="connectionFactory" ref="atomikosDtccDoxConnectionFactory"
/>
		<property name="transactionManager" ref="JtaTransactionManager"/>
	</bean>
	<bean id="dtccDox-JMS" class="org.apache.camel.component.jms.JmsComponent">
		<property name="configuration" ref="dtccDoxJmsConfig" />
	</bean>

--
View this message in context: http://camel.465427.n5.nabble.com/keep-track-of-JMSMessageID-with-InOnly-exchange-tp5122775p5714569.html
Sent from the Camel - Users mailing list archive at Nabble.com.