You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by RomualdoGobbo <Ro...@newlog.it> on 2010/09/07 12:30:53 UTC
Messages queue filtered depending on info received by another queue
Hi All,
I've the following problem unsolved regarding the filtered messages
depending on some info extractect from another queue.
The case is the following:
1. Primary queue contains the messages received from netty:tcp server
2.Depending the ID found in the body you have to select from another queue
only the messages affected to the previous ID, leaving the other messages in
the queue.
3. The returning message will be the message (if found) on the second queue,
otherwise all remains unchanged.
I've solved only partially the problem using the following camelcontext.xml:
<route>
<from uri="netty:tcp://localhost:5000?textline=true&sync=true"/>
<to uri="jms:MyQueue"/>
<pollEnrich uri="jms:toTelelinkQ" strategyRef="myBeanId"/>
</route> -->
<bean id="myBeanId" class="org.apache.camel.example.MyBeanProcessor"/>
due to the process myBeanId that is unable to leave the messages in the
original queue (see the following code), but all messages must be consumed
to know the body content:
// MyBeanProcessor.java
package org.apache.camel.example;
import java.io.*;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class MyBeanProcessor implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange == null) {
// no messages in queue
return newExchange;
}
String payloadOld = oldExchange.getIn().getBody(String.class);
String payloadNew = newExchange.getIn().getBody(String.class);
String telelink = payloadOld.substring(5, 14);
if (payloadNew.contains("#") & payloadNew.contains(telelink)) {
System.out.println("OK " + telelink + "=" + payloadNew);
oldExchange.getIn().setBody(payloadNew.substring(10));
} else {
System.out.println("NO OK " + telelink + "<>" + payloadNew);
oldExchange.getIn().setBody(null);
}
return oldExchange;
}
}
Many thanks for your help,
Romualdo
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805911.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
ERROR DUMP:
------------------------------------------------------------->
[INFO] Classpath = [file:/C:/Documents and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
[pache.camel.spring.Main.main()] MainSupport INFO Apache
Camel 2.4.0 starting
[pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO
Refreshing
org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
[pache.camel.spring.Main.main()] XmlBeanDefinitionReader INFO
Loading XML bean definitions from file [C:\Documents and
Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
[pache.camel.spring.Main.main()] CamelNamespaceHandler INFO
camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
[pache.camel.spring.Main.main()] DefaultListableBeanFactory INFO
Pre-instantiating singletons in
org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
defining beans
[producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
root of factory hierarchy
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.4.0 (CamelContext: Telelink-context) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext INFO
Tracing is enabled on CamelContext: Telelink-context
[pache.camel.spring.Main.main()] DefaultCamelContext INFO JMX
enabled. Using ManagedManagementStrategy.
[pache.camel.spring.Main.main()] AnnotationTypeConverterLoader INFO Found
4 packages with 14 @Converter classes to load
[pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded
148 type converters in 0.311 seconds
[pache.camel.spring.Main.main()] NettyConsumer INFO Netty
consumer bound to: localhost:5000
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route1 started and consuming from: Endpoint[file://src/data]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route2 started and consuming from: Endpoint[tcp://localhost:5000]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route3 started and consuming from: Endpoint[jms://MyQueue]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route4 started and consuming from: Endpoint[file://target/outputFiles]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO
Started 4 routes
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
[ New I/O server worker #1-1] Tracer INFO
83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
RECEIVED:
$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
BodyType:String,
Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
after receiving payload: telelink ID = 550700111
[ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
at
newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
[ New I/O server worker #1-1] Tracer INFO
6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
BodyType:String,
Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
after receiving payload: telelink ID = 550700111
[ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807536.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
// TelelinkCommandBean.java
package newlog.camel;
import org.apache.camel.*;
public class TelelinkCommandBean {
private ProducerTemplate producer;
private ConsumerTemplate consumer;
public void setConsumer(ConsumerTemplate consumer) {
this.consumer = consumer;
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
public void getTelelinkCommand(Exchange exchange) {
String body = null;
String bodyTelelink = null;
String command = null;
String telelink = null;
// receive the message from payload exchange
body = exchange.getIn().getBody(String.class);
telelink = body.substring(5, 14);
System.out.println("after receiving payload: telelink ID = " +
telelink);
// loop to empty queue
while (true) {
// receive the message from the COMMAND queue
bodyTelelink =
consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
+"))", String.class);
if (bodyTelelink == null) {
// no more messages in queue
break;
}
if (bodyTelelink.contains("#")) {
System.out.println("OK " + telelink + "=" + bodyTelelink);
command = bodyTelelink.substring(10);
} else {
System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
command = bodyTelelink;
}
// send it out the command
exchange.getIn().setBody(command);
}
}
}
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807534.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
camelContext.xml:
-------------------------------------------------------------------------->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<!-- START SNIPPET: example -->
<camelContext id="Telelink-context" trace="true"
xmlns="http://camel.apache.org/schema/spring">
<template id="producer" />
<consumerTemplate id="consumer" />
<route>
<!-- <from uri="file:src/data?noop=true"/> -->
<from uri="file:src/data"/>
<to uri="jms:toTelelinkQ"/>
</route>
<!-- OPEN Server Socket and send them to internal bean -->
<route>
<!-- <from uri="timer://foo?period=5000"/> -->
<from uri="netty:tcp://localhost:5000?textline=true&sync=true"/>
<bean ref="myBeanId" method="getTelelinkCommand"/>
<to uri="jms:MyQueue"/>
</route>
<!-- now command integration and lets write messages from the queue to a
directory -->
<route>
<from uri="jms:MyQueue"/>
<to uri="file:target/outputFiles"/>
</route>
<!-- now lets write messages from the directory to some queues directory
-->
<route>
<from uri="file:target/outputFiles"/>
<to uri="jms:toDroolsQ"/>
<to uri="jms:toTrackingQ"/>
</route>
</camelContext>
<!-- lets configure Bean -->
<bean id="myBeanId" class="newlog.camel.TelelinkCommandBean"/>
<!-- lets configure the default ActiveMQ broker URL -->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- <property name="brokerURL"
value="vm://localhost?broker.persistent=false"/> -->
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
</bean>
<!-- END SNIPPET: example -->
</beans>
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807533.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another queue
Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 8, 2010 at 2:22 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi,
> I'm using the camel 2.4.0 version
> Romualdo
I created an unit test you can take a look at
http://svn.apache.org/viewvc?rev=995371&view=rev
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807776.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi,
I'm using the camel 2.4.0 version
Romualdo
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807776.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another queue
Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 8, 2010 at 2:13 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi,
> I received the same exception error as above, even if you use a consumer
> without any sort of selector (see below).
> I think that the consumer procedure for some my mistake is not istantiated,
> but I don't find the solution.
> Excluding the code line with "consumer.receiveBody" all is OK!
>
Ah we fixed something for JMS selectors with consumer template a while back.
Maybe you need a newer version of Camel.
> Many thanks for your help,
> Best Regards,
> Romualdo
>
>
>
> --------------------- BEAN---------------------------->
> // TelelinkCommandBean.java
>
> package newlog.camel;
>
> import org.apache.camel.*;
>
>
> public class TelelinkCommandBean {
> private ProducerTemplate producer;
> private ConsumerTemplate consumer;
>
> public void setConsumer(ConsumerTemplate consumer) {
> this.consumer = consumer;
> }
> public void setProducer(ProducerTemplate producer) {
> this.producer = producer;
> }
>
> public void getTelelinkCommand(Exchange exchange) {
> String body = null;
> String bodyTelelink = null;
> String command = null;
> String telelink = null;
>
> // receive the message from payload exchange
> body = exchange.getIn().getBody(String.class);
> telelink = body.substring(5, 14);
> System.out.println("after receiving payload: telelink ID = " +
> telelink);
>
>
> // loop to empty queue
> while (true) {
>
> // receive the message from the COMMAND queue
> bodyTelelink = consumer.receiveBody("file:src/data", String.class);
> if (bodyTelelink == null) {
> // no more messages in queue
> break;
> }
>
> if (bodyTelelink.contains("#")) {
> System.out.println("OK " + telelink + "=" + bodyTelelink);
> command = bodyTelelink.substring(10);
> } else {
> System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
> command = bodyTelelink;
> }
> // send it out the command
> exchange.getIn().setBody(command);
> }
> }
> }
>
> -------------------EXCEPTION ERROR----------------->
>
> [ New I/O server worker #1-1] Tracer INFO
> 38923066-b464-45fb-9a8e-0b63e45a38d9 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyMessageEvent=[id: 0x012d9844, /127.0.0.1:3013 =>
> /127.0.0.1:5000] RECEIVED:
> $DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10,
> CamelNettyRemoteAddress=/127.0.0.1:3013,
> CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@16c14e7},
> BodyType:String,
> Body:$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10
> after receiving payload: telelink ID = 550700111
> [ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
> delivery for exchangeId: 38923066-b464-45fb-9a8e-0b63e45a38d9. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807766.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi,
I received the same exception error as above, even if you use a consumer
without any sort of selector (see below).
I think that the consumer procedure for some my mistake is not istantiated,
but I don't find the solution.
Excluding the code line with "consumer.receiveBody" all is OK!
Many thanks for your help,
Best Regards,
Romualdo
--------------------- BEAN---------------------------->
// TelelinkCommandBean.java
package newlog.camel;
import org.apache.camel.*;
public class TelelinkCommandBean {
private ProducerTemplate producer;
private ConsumerTemplate consumer;
public void setConsumer(ConsumerTemplate consumer) {
this.consumer = consumer;
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
public void getTelelinkCommand(Exchange exchange) {
String body = null;
String bodyTelelink = null;
String command = null;
String telelink = null;
// receive the message from payload exchange
body = exchange.getIn().getBody(String.class);
telelink = body.substring(5, 14);
System.out.println("after receiving payload: telelink ID = " +
telelink);
// loop to empty queue
while (true) {
// receive the message from the COMMAND queue
bodyTelelink = consumer.receiveBody("file:src/data", String.class);
if (bodyTelelink == null) {
// no more messages in queue
break;
}
if (bodyTelelink.contains("#")) {
System.out.println("OK " + telelink + "=" + bodyTelelink);
command = bodyTelelink.substring(10);
} else {
System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
command = bodyTelelink;
}
// send it out the command
exchange.getIn().setBody(command);
}
}
}
-------------------EXCEPTION ERROR----------------->
[ New I/O server worker #1-1] Tracer INFO
38923066-b464-45fb-9a8e-0b63e45a38d9 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyMessageEvent=[id: 0x012d9844, /127.0.0.1:3013 =>
/127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10,
CamelNettyRemoteAddress=/127.0.0.1:3013,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@16c14e7},
BodyType:String,
Body:$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10
after receiving payload: telelink ID = 550700111
[ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
delivery for exchangeId: 38923066-b464-45fb-9a8e-0b63e45a38d9. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807766.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another queue
Posted by Claus Ibsen <cl...@gmail.com>.
Hi
You have to use valid JMS selector syntax. This is standard JMS.
So google information how to do that with JMS. Its a bit limited what
you can do.
For example for ActiveMQ
http://activemq.apache.org/selectors.html
http://download.oracle.com/javaee/1.4/api/javax/jms/Message.html
On Wed, Sep 8, 2010 at 10:30 AM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi Claus,
> thank you very much for your suggestions.
> I've completely reviewed my code, but unfortunately I'm making some
> mistakes, but I don't find it: the message body don't return to the main
> context, as you can see in the code below, due to exception error.
>
> You can find below the new camelContext.xml and the new bean
> TelelinkCommandBean.java followed by the error dump.
>
> Thank a lot for your help
>
> Best Regards,
> Romualdo
>
> ------------------------------------------------------------------------------>
> camelContext.xml:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> TelelinkCommandBean.java:
>
> // TelelinkCommandBean.java
>
> package newlog.camel;
>
> import org.apache.camel.*;
>
> public class TelelinkCommandBean {
> private ProducerTemplate producer;
> private ConsumerTemplate consumer;
>
> public void setConsumer(ConsumerTemplate consumer) {
> this.consumer = consumer;
> }
>
> public void setProducer(ProducerTemplate producer) {
> this.producer = producer;
> }
>
> public void getTelelinkCommand(Exchange exchange) {
> String body = null;
> String bodyTelelink = null;
> String command = null;
> String telelink = null;
>
> // receive the message from payload exchange
> body = exchange.getIn().getBody(String.class);
> telelink = body.substring(5, 14);
> System.out.println("after receiving payload: telelink ID = " +
> telelink);
>
>
> // loop to empty queue
> while (true) {
>
> // receive the message from the COMMAND queue
> bodyTelelink =
> consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
> +"))", String.class);
> if (bodyTelelink == null) {
> // no more messages in queue
> break;
> }
>
> if (bodyTelelink.contains("#")) {
> System.out.println("OK " + telelink + "=" + bodyTelelink);
> command = bodyTelelink.substring(10);
> } else {
> System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
> command = bodyTelelink;
> }
> // send it out the command
> exchange.getIn().setBody(command);
> }
> }
> }
>
> Error received at run-time:
>
> [INFO] Classpath = [file:/C:/Documents and
> Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
> file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
> [pache.camel.spring.Main.main()] MainSupport INFO Apache
> Camel 2.4.0 starting
> [pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO
> Refreshing
> org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
> startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
> [pache.camel.spring.Main.main()] XmlBeanDefinitionReader INFO
> Loading XML bean definitions from file [C:\Documents and
> Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
> [pache.camel.spring.Main.main()] CamelNamespaceHandler INFO
> camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
> [pache.camel.spring.Main.main()] DefaultListableBeanFactory INFO
> Pre-instantiating singletons in
> org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
> defining beans
> [producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
> root of factory hierarchy
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
> Camel 2.4.0 (CamelContext: Telelink-context) is starting
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO
> Tracing is enabled on CamelContext: Telelink-context
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO JMX
> enabled. Using ManagedManagementStrategy.
> [pache.camel.spring.Main.main()] AnnotationTypeConverterLoader INFO Found
> 4 packages with 14 @Converter classes to load
> [pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded
> 148 type converters in 0.311 seconds
> [pache.camel.spring.Main.main()] NettyConsumer INFO Netty
> consumer bound to: localhost:5000
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
> route1 started and consuming from: Endpoint[file://src/data]
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
> route2 started and consuming from: Endpoint[tcp://localhost:5000]
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
> route3 started and consuming from: Endpoint[jms://MyQueue]
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
> route4 started and consuming from: Endpoint[file://target/outputFiles]
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO
> Started 4 routes
> [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
> Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
> [ New I/O server worker #1-1] Tracer INFO
> 83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
> CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
> CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
> RECEIVED:
> $DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
> BodyType:String,
> Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
> after receiving payload: telelink ID = 550700111
> [ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
> delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException
> at
> newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
> and
> Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
> at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
> at
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
> at
> org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
> at
> org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
> at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
> [ New I/O server worker #1-1] Tracer INFO
> 6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
> CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
> 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
> $DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
> BodyType:String,
> Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
> after receiving payload: telelink ID = 550700111
> [ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
> delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException.... and more
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807527.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi Claus,
thank you very much for your suggestions.
I've completely reviewed my code, but unfortunately I'm making some
mistakes, but I don't find it: the message body don't return to the main
context, as you can see in the code below, due to exception error.
You can find below the new camelContext.xml and the new bean
TelelinkCommandBean.java followed by the error dump.
Thank a lot for your help
Best Regards,
Romualdo
------------------------------------------------------------------------------>
camelContext.xml:
TelelinkCommandBean.java:
// TelelinkCommandBean.java
package newlog.camel;
import org.apache.camel.*;
public class TelelinkCommandBean {
private ProducerTemplate producer;
private ConsumerTemplate consumer;
public void setConsumer(ConsumerTemplate consumer) {
this.consumer = consumer;
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
public void getTelelinkCommand(Exchange exchange) {
String body = null;
String bodyTelelink = null;
String command = null;
String telelink = null;
// receive the message from payload exchange
body = exchange.getIn().getBody(String.class);
telelink = body.substring(5, 14);
System.out.println("after receiving payload: telelink ID = " +
telelink);
// loop to empty queue
while (true) {
// receive the message from the COMMAND queue
bodyTelelink =
consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
+"))", String.class);
if (bodyTelelink == null) {
// no more messages in queue
break;
}
if (bodyTelelink.contains("#")) {
System.out.println("OK " + telelink + "=" + bodyTelelink);
command = bodyTelelink.substring(10);
} else {
System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
command = bodyTelelink;
}
// send it out the command
exchange.getIn().setBody(command);
}
}
}
Error received at run-time:
[INFO] Classpath = [file:/C:/Documents and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
[pache.camel.spring.Main.main()] MainSupport INFO Apache
Camel 2.4.0 starting
[pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO
Refreshing
org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
[pache.camel.spring.Main.main()] XmlBeanDefinitionReader INFO
Loading XML bean definitions from file [C:\Documents and
Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
[pache.camel.spring.Main.main()] CamelNamespaceHandler INFO
camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
[pache.camel.spring.Main.main()] DefaultListableBeanFactory INFO
Pre-instantiating singletons in
org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
defining beans
[producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
root of factory hierarchy
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.4.0 (CamelContext: Telelink-context) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext INFO
Tracing is enabled on CamelContext: Telelink-context
[pache.camel.spring.Main.main()] DefaultCamelContext INFO JMX
enabled. Using ManagedManagementStrategy.
[pache.camel.spring.Main.main()] AnnotationTypeConverterLoader INFO Found
4 packages with 14 @Converter classes to load
[pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded
148 type converters in 0.311 seconds
[pache.camel.spring.Main.main()] NettyConsumer INFO Netty
consumer bound to: localhost:5000
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route1 started and consuming from: Endpoint[file://src/data]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route2 started and consuming from: Endpoint[tcp://localhost:5000]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route3 started and consuming from: Endpoint[jms://MyQueue]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Route:
route4 started and consuming from: Endpoint[file://target/outputFiles]
[pache.camel.spring.Main.main()] DefaultCamelContext INFO
Started 4 routes
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
[ New I/O server worker #1-1] Tracer INFO
83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
RECEIVED:
$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
BodyType:String,
Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
after receiving payload: telelink ID = 550700111
[ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
at
newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
at
org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
at
org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
[ New I/O server worker #1-1] Tracer INFO
6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
BodyType:String,
Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
after receiving payload: telelink ID = 550700111
[ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed
delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException.... and more
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807527.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another queue
Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Sep 7, 2010 at 2:04 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Have you some sample using Spring XML?
> With Spring XML seem to me that was impossible make a sort of data transfer
> from one "route" to another: the only samples using filters are built with
> explicit value: i.e. "bar"...
The easiest is to use a bean / Camel Processor and then use
ConsumerTemplate to "drain" the JMS queue for messages with that
special ID.
You can then do this in a loop in the Java code.
See appendix C in the Camel book.
And/or this link
http://camel.apache.org/polling-consumer.html
> Thanks a lot.
> Romualdo
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805980.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Re: Messages queue filtered depending on info received by another
queue
Posted by RomualdoGobbo <Ro...@newlog.it>.
Have you some sample using Spring XML?
With Spring XML seem to me that was impossible make a sort of data transfer
from one "route" to another: the only samples using filters are built with
explicit value: i.e. "bar"...
Thanks a lot.
Romualdo
--
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805980.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Messages queue filtered depending on info received by another queue
Posted by Claus Ibsen <cl...@gmail.com>.
You can use JMS message selector to only pickup certain messages from
a JMS queue.
On Tue, Sep 7, 2010 at 12:30 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi All,
> I've the following problem unsolved regarding the filtered messages
> depending on some info extractect from another queue.
>
> The case is the following:
>
> 1. Primary queue contains the messages received from netty:tcp server
> 2.Depending the ID found in the body you have to select from another queue
> only the messages affected to the previous ID, leaving the other messages in
> the queue.
> 3. The returning message will be the message (if found) on the second queue,
> otherwise all remains unchanged.
>
> I've solved only partially the problem using the following camelcontext.xml:
>
> <route>
> <from uri="netty:tcp://localhost:5000?textline=true&sync=true"/>
> <to uri="jms:MyQueue"/>
> <pollEnrich uri="jms:toTelelinkQ" strategyRef="myBeanId"/>
> </route> -->
>
> <bean id="myBeanId" class="org.apache.camel.example.MyBeanProcessor"/>
>
> due to the process myBeanId that is unable to leave the messages in the
> original queue (see the following code), but all messages must be consumed
> to know the body content:
>
> // MyBeanProcessor.java
>
> package org.apache.camel.example;
>
> import java.io.*;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
>
> public class MyBeanProcessor implements AggregationStrategy {
>
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>
> if (newExchange == null) {
> // no messages in queue
> return newExchange;
> }
> String payloadOld = oldExchange.getIn().getBody(String.class);
> String payloadNew = newExchange.getIn().getBody(String.class);
>
> String telelink = payloadOld.substring(5, 14);
>
> if (payloadNew.contains("#") & payloadNew.contains(telelink)) {
> System.out.println("OK " + telelink + "=" + payloadNew);
> oldExchange.getIn().setBody(payloadNew.substring(10));
> } else {
> System.out.println("NO OK " + telelink + "<>" + payloadNew);
> oldExchange.getIn().setBody(null);
>
> }
> return oldExchange;
> }
> }
>
> Many thanks for your help,
> Romualdo
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805911.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus