You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by RomualdoGobbo <Ro...@newlog.it> on 2010/09/07 12:30:53 UTC

Messages queue filtered depending on info received by another queue

Hi All,
I've the following problem unsolved regarding the filtered messages
depending on some info extractect from another queue.

The case is the following:

1. Primary queue contains the messages received from netty:tcp server
2.Depending the ID found in the body you have to select from another queue
only the messages affected to the previous ID, leaving the other messages in
the queue.
3. The returning message will be the message (if found) on the second queue,
otherwise all remains unchanged.

I've solved only partially the problem using the following camelcontext.xml:

	<route>
    	<from uri="netty:tcp://localhost:5000?textline=true&amp;sync=true"/>
    	<to uri="jms:MyQueue"/>
 	    <pollEnrich uri="jms:toTelelinkQ" strategyRef="myBeanId"/>
	</route>  -->

	<bean id="myBeanId" class="org.apache.camel.example.MyBeanProcessor"/>

due to the process myBeanId that is unable to leave the messages in the
original queue (see the following code), but all messages must be consumed
to know the body content:

// MyBeanProcessor.java

package org.apache.camel.example;

import java.io.*;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class MyBeanProcessor implements AggregationStrategy {

	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

		if (newExchange == null) {
			// no messages in queue
			return newExchange;
		}
		String payloadOld = oldExchange.getIn().getBody(String.class);
		String payloadNew = newExchange.getIn().getBody(String.class);

		String telelink = payloadOld.substring(5, 14);

		if (payloadNew.contains("#") & payloadNew.contains(telelink)) {
			System.out.println("OK " + telelink + "=" + payloadNew);
			oldExchange.getIn().setBody(payloadNew.substring(10));
		} else {
			System.out.println("NO OK " + telelink + "<>" + payloadNew);
			oldExchange.getIn().setBody(null);

		}
		return oldExchange;
	}
}

Many thanks for your help,
Romualdo
 
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805911.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
ERROR DUMP:
------------------------------------------------------------->


[INFO] Classpath = [file:/C:/Documents and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
[pache.camel.spring.Main.main()] MainSupport                    INFO  Apache
Camel 2.4.0 starting
[pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO 
Refreshing
org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
[pache.camel.spring.Main.main()] XmlBeanDefinitionReader        INFO 
Loading XML bean definitions from file [C:\Documents and
Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
[pache.camel.spring.Main.main()] CamelNamespaceHandler          INFO 
camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
[pache.camel.spring.Main.main()] DefaultListableBeanFactory     INFO 
Pre-instantiating singletons in
org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
defining beans
[producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
root of factory hierarchy
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.4.0 (CamelContext: Telelink-context) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO 
Tracing is enabled on CamelContext: Telelink-context
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  JMX
enabled. Using ManagedManagementStrategy.
[pache.camel.spring.Main.main()] AnnotationTypeConverterLoader  INFO  Found
4 packages with 14 @Converter classes to load
[pache.camel.spring.Main.main()] DefaultTypeConverter           INFO  Loaded
148 type converters in 0.311 seconds
[pache.camel.spring.Main.main()] NettyConsumer                  INFO  Netty
consumer bound to: localhost:5000
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route1 started and consuming from: Endpoint[file://src/data]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route2 started and consuming from: Endpoint[tcp://localhost:5000]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route3 started and consuming from: Endpoint[jms://MyQueue]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route4 started and consuming from: Endpoint[file://target/outputFiles]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO 
Started 4 routes
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
[    New I/O server worker #1-1] Tracer                         INFO 
83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
RECEIVED:
$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
BodyType:String,
Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
after receiving payload: telelink ID = 550700111
[    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
	at
newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
	at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
	at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
	at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
	at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
[    New I/O server worker #1-1] Tracer                         INFO 
6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
BodyType:String,
Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
after receiving payload: telelink ID = 550700111
[    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807536.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
// TelelinkCommandBean.java

package newlog.camel;

import org.apache.camel.*;

public class TelelinkCommandBean {
	private ProducerTemplate producer;
	private ConsumerTemplate consumer;
		
    public void setConsumer(ConsumerTemplate consumer) {
        this.consumer = consumer;
    }

    public void setProducer(ProducerTemplate producer) {
        this.producer = producer;
    }
	
	public void getTelelinkCommand(Exchange exchange) {
		String body = null;
		String bodyTelelink = null;
		String command = null;
		String telelink = null;
		
       	// receive the message from payload exchange
		body = exchange.getIn().getBody(String.class);
  		telelink = body.substring(5, 14);
  		System.out.println("after receiving payload: telelink ID = " +
telelink);
		

      // loop to empty queue
        while (true) {
        	
    		// receive the message from the COMMAND queue
    		bodyTelelink =
consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
+"))", String.class);
            if (bodyTelelink == null) {
                // no more messages in queue
                break;
            }

			if (bodyTelelink.contains("#")) {
				System.out.println("OK " + telelink + "=" + bodyTelelink);
				command = bodyTelelink.substring(10);
			} else {
				System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
				command = bodyTelelink;
				}
        // send it out the command
			exchange.getIn().setBody(command);
        }
    }
}

-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807534.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
camelContext.xml:
-------------------------------------------------------------------------->


<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
  
  <!-- START SNIPPET: example -->
  <camelContext id="Telelink-context" trace="true"
xmlns="http://camel.apache.org/schema/spring">
	<template id="producer" />
	<consumerTemplate id="consumer" />

	<route>
<!--    <from uri="file:src/data?noop=true"/>  -->
		<from uri="file:src/data"/>
		<to uri="jms:toTelelinkQ"/>
	</route>

 <!-- OPEN Server Socket and send them to internal bean   -->
	<route>
 <!--     	<from uri="timer://foo?period=5000"/>     -->
	<from uri="netty:tcp://localhost:5000?textline=true&amp;sync=true"/>
  		<bean ref="myBeanId" method="getTelelinkCommand"/>
  		<to uri="jms:MyQueue"/>
	</route>

    <!-- now command integration and lets write messages from the queue to a
directory     -->   
    <route>
  		<from uri="jms:MyQueue"/>
		<to uri="file:target/outputFiles"/>
 	</route>

    <!-- now lets write messages from the directory to some queues directory
-->      
    <route>
  		<from uri="file:target/outputFiles"/>
		<to uri="jms:toDroolsQ"/>
		<to uri="jms:toTrackingQ"/>
 	</route>
   
  </camelContext>
  

	<!-- lets configure Bean -->
	<bean id="myBeanId" class="newlog.camel.TelelinkCommandBean"/>

	<!-- lets configure the default ActiveMQ broker URL -->
	<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
		<property name="connectionFactory">
		<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- 			<property name="brokerURL"
value="vm://localhost?broker.persistent=false"/>  -->
				<property name="brokerURL" value="tcp://localhost:61616"/>  
		</bean>
    	</property>
  	</bean>
  <!-- END SNIPPET: example -->

</beans>
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807533.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 8, 2010 at 2:22 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi,
> I'm using the camel 2.4.0 version
> Romualdo

I created an unit test you can take a look at
http://svn.apache.org/viewvc?rev=995371&view=rev

> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807776.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi,
I'm using the camel 2.4.0 version
Romualdo
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807776.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 8, 2010 at 2:13 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi,
> I received the same exception error as above, even if you use a consumer
> without any sort of selector (see below).
> I think that the consumer procedure for some my mistake is not istantiated,
> but I don't find the solution.
> Excluding the code line with "consumer.receiveBody" all is OK!
>

Ah we fixed something for JMS selectors with consumer template a while back.
Maybe you need a newer version of Camel.


> Many thanks for your help,
> Best Regards,
> Romualdo
>
>
>
> --------------------- BEAN---------------------------->
> // TelelinkCommandBean.java
>
> package newlog.camel;
>
> import org.apache.camel.*;
>
>
> public class TelelinkCommandBean {
>        private ProducerTemplate producer;
>        private ConsumerTemplate consumer;
>
>    public void setConsumer(ConsumerTemplate consumer) {
>        this.consumer = consumer;
>    }
>    public void setProducer(ProducerTemplate producer) {
>        this.producer = producer;
>    }
>
>        public void getTelelinkCommand(Exchange exchange) {
>                String body = null;
>                String bodyTelelink = null;
>                String command = null;
>                String telelink = null;
>
>        // receive the message from payload exchange
>                body = exchange.getIn().getBody(String.class);
>                telelink = body.substring(5, 14);
>                System.out.println("after receiving payload: telelink ID = " +
> telelink);
>
>
>      // loop to empty queue
>        while (true) {
>
>                // receive the message from the COMMAND queue
>                bodyTelelink = consumer.receiveBody("file:src/data", String.class);
>            if (bodyTelelink == null) {
>                // no more messages in queue
>                break;
>            }
>
>                        if (bodyTelelink.contains("#")) {
>                                System.out.println("OK " + telelink + "=" + bodyTelelink);
>                                command = bodyTelelink.substring(10);
>                        } else {
>                                System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
>                                command = bodyTelelink;
>                                }
>        // send it out the command
>                        exchange.getIn().setBody(command);
>        }
>    }
> }
>
> -------------------EXCEPTION ERROR----------------->
>
> [    New I/O server worker #1-1] Tracer                         INFO
> 38923066-b464-45fb-9a8e-0b63e45a38d9 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyMessageEvent=[id: 0x012d9844, /127.0.0.1:3013 =>
> /127.0.0.1:5000] RECEIVED:
> $DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10,
> CamelNettyRemoteAddress=/127.0.0.1:3013,
> CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@16c14e7},
> BodyType:String,
> Body:$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10
> after receiving payload: telelink ID = 550700111
> [    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
> delivery for exchangeId: 38923066-b464-45fb-9a8e-0b63e45a38d9. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807766.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi,
I received the same exception error as above, even if you use a consumer
without any sort of selector (see below).
I think that the consumer procedure for some my mistake is not istantiated,
but I don't find the solution.
Excluding the code line with "consumer.receiveBody" all is OK!

Many thanks for your help,
Best Regards,
Romualdo



--------------------- BEAN---------------------------->
// TelelinkCommandBean.java

package newlog.camel;

import org.apache.camel.*;


public class TelelinkCommandBean {
	private ProducerTemplate producer;
	private ConsumerTemplate consumer;
	
    public void setConsumer(ConsumerTemplate consumer) {
        this.consumer = consumer;
    }
    public void setProducer(ProducerTemplate producer) {
        this.producer = producer;
    }
	
	public void getTelelinkCommand(Exchange exchange) {
		String body = null;
		String bodyTelelink = null;
		String command = null;
		String telelink = null;
		
       	// receive the message from payload exchange
		body = exchange.getIn().getBody(String.class);
  		telelink = body.substring(5, 14);
  		System.out.println("after receiving payload: telelink ID = " +
telelink);
		

      // loop to empty queue
        while (true) {
        	
    		// receive the message from the COMMAND queue
        	bodyTelelink = consumer.receiveBody("file:src/data", String.class);
            if (bodyTelelink == null) {
                // no more messages in queue
                break;
            }

			if (bodyTelelink.contains("#")) {
				System.out.println("OK " + telelink + "=" + bodyTelelink);
				command = bodyTelelink.substring(10);
			} else {
				System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
				command = bodyTelelink;
				}
        // send it out the command
			exchange.getIn().setBody(command);
        }
    }
}

-------------------EXCEPTION ERROR----------------->

[    New I/O server worker #1-1] Tracer                         INFO 
38923066-b464-45fb-9a8e-0b63e45a38d9 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyMessageEvent=[id: 0x012d9844, /127.0.0.1:3013 =>
/127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10,
CamelNettyRemoteAddress=/127.0.0.1:3013,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@16c14e7},
BodyType:String,
Body:$DCS,550700111,100908120155,1,45.57075,9.36793,78,156.5,123456,067F,0,0,,1,00001,000,12.7,214,583,0A10
after receiving payload: telelink ID = 550700111
[    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: 38923066-b464-45fb-9a8e-0b63e45a38d9. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807766.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

You have to use valid JMS selector syntax. This is standard JMS.
So google information how to do that with JMS. Its a bit limited what
you can do.

For example for ActiveMQ
http://activemq.apache.org/selectors.html
http://download.oracle.com/javaee/1.4/api/javax/jms/Message.html

On Wed, Sep 8, 2010 at 10:30 AM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi Claus,
> thank you very much for your suggestions.
> I've completely reviewed my code, but unfortunately I'm making some
> mistakes, but I don't find it: the message body don't return to the main
> context, as you can see in the code below, due to exception error.
>
> You can find below the new camelContext.xml and the new bean
> TelelinkCommandBean.java followed by the error dump.
>
> Thank a lot for your help
>
> Best Regards,
> Romualdo
>
> ------------------------------------------------------------------------------>
> camelContext.xml:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> TelelinkCommandBean.java:
>
> // TelelinkCommandBean.java
>
> package newlog.camel;
>
> import org.apache.camel.*;
>
> public class TelelinkCommandBean {
>        private ProducerTemplate producer;
>        private ConsumerTemplate consumer;
>
>    public void setConsumer(ConsumerTemplate consumer) {
>        this.consumer = consumer;
>    }
>
>    public void setProducer(ProducerTemplate producer) {
>        this.producer = producer;
>    }
>
>        public void getTelelinkCommand(Exchange exchange) {
>                String body = null;
>                String bodyTelelink = null;
>                String command = null;
>                String telelink = null;
>
>        // receive the message from payload exchange
>                body = exchange.getIn().getBody(String.class);
>                telelink = body.substring(5, 14);
>                System.out.println("after receiving payload: telelink ID = " +
> telelink);
>
>
>      // loop to empty queue
>        while (true) {
>
>                // receive the message from the COMMAND queue
>                bodyTelelink =
> consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
> +"))", String.class);
>            if (bodyTelelink == null) {
>                // no more messages in queue
>                break;
>            }
>
>                        if (bodyTelelink.contains("#")) {
>                                System.out.println("OK " + telelink + "=" + bodyTelelink);
>                                command = bodyTelelink.substring(10);
>                        } else {
>                                System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
>                                command = bodyTelelink;
>                                }
>        // send it out the command
>                        exchange.getIn().setBody(command);
>        }
>    }
> }
>
> Error received at run-time:
>
> [INFO] Classpath = [file:/C:/Documents and
> Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
> file:/C:/Documents and
> Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
> file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
> [pache.camel.spring.Main.main()] MainSupport                    INFO  Apache
> Camel 2.4.0 starting
> [pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO
> Refreshing
> org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
> startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
> [pache.camel.spring.Main.main()] XmlBeanDefinitionReader        INFO
> Loading XML bean definitions from file [C:\Documents and
> Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
> [pache.camel.spring.Main.main()] CamelNamespaceHandler          INFO
> camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
> [pache.camel.spring.Main.main()] DefaultListableBeanFactory     INFO
> Pre-instantiating singletons in
> org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
> defining beans
> [producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
> root of factory hierarchy
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.4.0 (CamelContext: Telelink-context) is starting
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO
> Tracing is enabled on CamelContext: Telelink-context
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  JMX
> enabled. Using ManagedManagementStrategy.
> [pache.camel.spring.Main.main()] AnnotationTypeConverterLoader  INFO  Found
> 4 packages with 14 @Converter classes to load
> [pache.camel.spring.Main.main()] DefaultTypeConverter           INFO  Loaded
> 148 type converters in 0.311 seconds
> [pache.camel.spring.Main.main()] NettyConsumer                  INFO  Netty
> consumer bound to: localhost:5000
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
> route1 started and consuming from: Endpoint[file://src/data]
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
> route2 started and consuming from: Endpoint[tcp://localhost:5000]
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
> route3 started and consuming from: Endpoint[jms://MyQueue]
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
> route4 started and consuming from: Endpoint[file://target/outputFiles]
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO
> Started 4 routes
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
> [    New I/O server worker #1-1] Tracer                         INFO
> 83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
> CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
> CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
> RECEIVED:
> $DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
> BodyType:String,
> Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
> after receiving payload: telelink ID = 550700111
> [    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
> delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException
>        at
> newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
> and
> Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
>        at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
>        at
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
>        at
> org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
>        at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
>        at
> org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
>        at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
> [    New I/O server worker #1-1] Tracer                         INFO
> 6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
> --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
> Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
> CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
> 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
> $DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
> BodyType:String,
> Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
> after receiving payload: telelink ID = 550700111
> [    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
> delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
> after delivery attempt: 1 caught: java.lang.NullPointerException
> java.lang.NullPointerException.... and more
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807527.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
Hi Claus,
thank you very much for your suggestions.
I've completely reviewed my code, but unfortunately I'm making some
mistakes, but I don't find it: the message body don't return to the main
context, as you can see in the code below, due to exception error.

You can find below the new camelContext.xml and the new bean
TelelinkCommandBean.java followed by the error dump.

Thank a lot for your help

Best Regards,
Romualdo

------------------------------------------------------------------------------>
camelContext.xml:


  
  
  
	
	

	

		
		
	

 
	
 
	
  		
  		
	

       
    
  		
		
 	

          
    
  		
		
		
 	
   
  
  

	
	

	
	
		
		

				  
		
    	
  	
  



TelelinkCommandBean.java:

// TelelinkCommandBean.java

package newlog.camel;

import org.apache.camel.*;

public class TelelinkCommandBean {
	private ProducerTemplate producer;
	private ConsumerTemplate consumer;
		
    public void setConsumer(ConsumerTemplate consumer) {
        this.consumer = consumer;
    }

    public void setProducer(ProducerTemplate producer) {
        this.producer = producer;
    }
	
	public void getTelelinkCommand(Exchange exchange) {
		String body = null;
		String bodyTelelink = null;
		String command = null;
		String telelink = null;
		
       	// receive the message from payload exchange
		body = exchange.getIn().getBody(String.class);
  		telelink = body.substring(5, 14);
  		System.out.println("after receiving payload: telelink ID = " +
telelink);
		

      // loop to empty queue
        while (true) {
        	
    		// receive the message from the COMMAND queue
    		bodyTelelink =
consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink
+"))", String.class);
            if (bodyTelelink == null) {
                // no more messages in queue
                break;
            }

			if (bodyTelelink.contains("#")) {
				System.out.println("OK " + telelink + "=" + bodyTelelink);
				command = bodyTelelink.substring(10);
			} else {
				System.out.println("NO OK " + telelink + "<>" + bodyTelelink);
				command = bodyTelelink;
				}
        // send it out the command
			exchange.getIn().setBody(command);
        }
    }
}

Error received at run-time:

[INFO] Classpath = [file:/C:/Documents and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar,
file:/C:/Documents and
Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar,
file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar]
[pache.camel.spring.Main.main()] MainSupport                    INFO  Apache
Camel 2.4.0 starting
[pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO 
Refreshing
org.springframework.context.support.ClassPathXmlApplicationContext@fbcb70:
startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy
[pache.camel.spring.Main.main()] XmlBeanDefinitionReader        INFO 
Loading XML bean definitions from file [C:\Documents and
Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml]
[pache.camel.spring.Main.main()] CamelNamespaceHandler          INFO 
camel-osgi.jar/camel-spring-osgi.jar not detected in classpath
[pache.camel.spring.Main.main()] DefaultListableBeanFactory     INFO 
Pre-instantiating singletons in
org.springframework.beans.factory.support.DefaultListableBeanFactory@302df5:
defining beans
[producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms];
root of factory hierarchy
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.4.0 (CamelContext: Telelink-context) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO 
Tracing is enabled on CamelContext: Telelink-context
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  JMX
enabled. Using ManagedManagementStrategy.
[pache.camel.spring.Main.main()] AnnotationTypeConverterLoader  INFO  Found
4 packages with 14 @Converter classes to load
[pache.camel.spring.Main.main()] DefaultTypeConverter           INFO  Loaded
148 type converters in 0.311 seconds
[pache.camel.spring.Main.main()] NettyConsumer                  INFO  Netty
consumer bound to: localhost:5000
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route1 started and consuming from: Endpoint[file://src/data]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route2 started and consuming from: Endpoint[tcp://localhost:5000]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route3 started and consuming from: Endpoint[jms://MyQueue]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Route:
route4 started and consuming from: Endpoint[file://target/outputFiles]
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO 
Started 4 routes
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds
[    New I/O server worker #1-1] Tracer                         INFO 
83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000]
RECEIVED:
$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C},
BodyType:String,
Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C
after receiving payload: telelink ID = 550700111
[    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
	at
newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents
and
Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_20]
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20]
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20]
	at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20]
	at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0]
	at
org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0]
	at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:]
	at
org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
	at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
[    New I/O server worker #1-1] Tracer                         INFO 
6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000)
--> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut,
Headers:{CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@fdbc27,
CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id:
0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED:
$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE},
BodyType:String,
Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE
after receiving payload: telelink ID = 550700111
[    New I/O server worker #1-1] DefaultErrorHandler            ERROR Failed
delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted
after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException.... and more

-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807527.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Sep 7, 2010 at 2:04 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Have you some sample using Spring XML?
> With Spring XML seem to me that was impossible make a sort of data transfer
> from one "route" to another: the only samples using filters are built with
> explicit value: i.e. "bar"...

The easiest is to use a bean / Camel Processor and then use
ConsumerTemplate to "drain" the JMS queue for messages with that
special ID.
You can then do this in a loop in the Java code.

See appendix C in the Camel book.

And/or this link
http://camel.apache.org/polling-consumer.html

> Thanks a lot.
> Romualdo
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805980.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Messages queue filtered depending on info received by another queue

Posted by RomualdoGobbo <Ro...@newlog.it>.
Have you some sample using Spring XML? 
With Spring XML seem to me that was impossible make a sort of data transfer
from one "route" to another: the only samples using filters are built with
explicit value: i.e. "bar"...
Thanks a lot.
Romualdo
-- 
View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805980.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Messages queue filtered depending on info received by another queue

Posted by Claus Ibsen <cl...@gmail.com>.
You can use JMS message selector to only pickup certain messages from
a JMS queue.


On Tue, Sep 7, 2010 at 12:30 PM, RomualdoGobbo <Ro...@newlog.it> wrote:
>
> Hi All,
> I've the following problem unsolved regarding the filtered messages
> depending on some info extractect from another queue.
>
> The case is the following:
>
> 1. Primary queue contains the messages received from netty:tcp server
> 2.Depending the ID found in the body you have to select from another queue
> only the messages affected to the previous ID, leaving the other messages in
> the queue.
> 3. The returning message will be the message (if found) on the second queue,
> otherwise all remains unchanged.
>
> I've solved only partially the problem using the following camelcontext.xml:
>
>        <route>
>        <from uri="netty:tcp://localhost:5000?textline=true&amp;sync=true"/>
>        <to uri="jms:MyQueue"/>
>            <pollEnrich uri="jms:toTelelinkQ" strategyRef="myBeanId"/>
>        </route>  -->
>
>        <bean id="myBeanId" class="org.apache.camel.example.MyBeanProcessor"/>
>
> due to the process myBeanId that is unable to leave the messages in the
> original queue (see the following code), but all messages must be consumed
> to know the body content:
>
> // MyBeanProcessor.java
>
> package org.apache.camel.example;
>
> import java.io.*;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
>
> public class MyBeanProcessor implements AggregationStrategy {
>
>        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>
>                if (newExchange == null) {
>                        // no messages in queue
>                        return newExchange;
>                }
>                String payloadOld = oldExchange.getIn().getBody(String.class);
>                String payloadNew = newExchange.getIn().getBody(String.class);
>
>                String telelink = payloadOld.substring(5, 14);
>
>                if (payloadNew.contains("#") & payloadNew.contains(telelink)) {
>                        System.out.println("OK " + telelink + "=" + payloadNew);
>                        oldExchange.getIn().setBody(payloadNew.substring(10));
>                } else {
>                        System.out.println("NO OK " + telelink + "<>" + payloadNew);
>                        oldExchange.getIn().setBody(null);
>
>                }
>                return oldExchange;
>        }
> }
>
> Many thanks for your help,
> Romualdo
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2805911.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus