You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Bill Brigden <bi...@dotbill.co.uk> on 2014/06/26 13:49:49 UTC

Problems producing with a consumer..

Hi all,
 
I've got an issue with producing inside (and during) the consumption of a
message. Ie:
 
- A camel context that has:
- A route that receives a message (sync) and sends to a bean function
- Inside that function, I produce an outgoing message (sync) that is
destined for another route pointing at a different bean function
 
Ie - for more detail:
 
Consumer route (INOUT):
from:jms:queue:....
to:CamelRemoteProxy.consumer
 
Producer route (INOUT):
from:CamelRemoteProxy.producer
to:jms:queue:...
 
In the bean:
 
@Consume(uri = "direct:CamelRemoteProxy.consumer")
    @SuppressWarnings("unchecked")
    public void camelConsume(Exchange exchange) {
 
        String jsonIn = exchange.getIn().getBody().toString();
 
        logger.debug(String.format(
                "Received JSON message from %s: %s",
                exchange.getFromEndpoint().getEndpointUri(),
                jsonIn));

	// the final bean method is invoked through using one of the
x-headers to establish which method to call.
}
 
// In the production...
   @EndpointInject(uri = "direct:CamelRemoteProxy.producer")
    ProducerTemplate producer;
 
Public <T> T invoke (Object payload) {
String rpcMethod = annotation.methodName();
            String camelURI = String.format("jms:queue:%s.in", rpcMethod);
 
            logger.info(String.format("Sending RPC envelope (destination
URI: %s)", camelURI));
            String response = (String)
producer.requestBodyAndHeader(camelURI, out.toString(), HEADER_RPC_METHOD,
rpcMethod);
            logger.info("Producer returned...");
 
 
// The route registration is done via:
 
camelContext.addRoutes(new ServerRoute(binding.getRpcMethodName()));
 
// With...
 
private class ServerRoute extends RouteBuilder {
 
        private String uri;
        private String rpcMethod;
 
        private ServerRoute(String rpcMethod) {
            this.rpcMethod = rpcMethod;
            logger.info("JMS registered");
            this.uri = String.format("jms:queue:%s.out", rpcMethod);
        }
 
        @Override
        public void configure() throws Exception {
            from(uri).
                    setHeader(HEADER_RPC_METHOD, simple(rpcMethod)).
                    to(camelConsumerEndpoint);
        }
    }
 
 
I'm seeing problems where the message hangs when trying to send to the
ActiveMQConnectionFactory - waiting for the first message to timeout before
the 2nd message also fails.
 
Are there issues producing a secondary synchronous INTOUT message, inside of
a primary synchronous INOUT consume?
 
Is anyone aware of this being a problem in general, or other ways to solve
it?
 
I've tried setting up separate activemqcomponents with different broker's,
using both vm and tcp connections towards the broker and all exhibit the
same problem, as if the JMS is blocking when sitting with camel.
 
FYI - using camel 2.13.1 (with activemq 5.10.0 with a karaf 2.3.5 container)
 
Thanks in advance,

Bill.