You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by SteveR <sr...@vonage.com> on 2015/10/25 03:50:00 UTC

Camel netty: How to send UDP reply based on NETTY_REMOTE_ADDRESS?

Hi:

I have a Linux RedHat-6 application using *Camel 2.15.2* and *netty3* and I
need to reply to various UDP clients that send UDP requests into my Camel
route.  However, I can't do a blind-ACK. That is, I have to first process
each received UDP request, synchronously produce it to a remote Kafka
cluster, and only when the Exchange is successfully produced to Kafka, then
formulate the associated UDP reply and send it back to the associated UDP
client.

To do this my input route consumes via *netty:udp* on a well-known listen
port, and I then place all received exchanges on a *SEDA_KAFKA_QUEUE* that
consumes them and produces to Kafka.  I'm using the Camel
*onCompletion.onCompleteOnly()* mechanism in this Kafka route to trigger
sending the successfully processed Exchanges to a *SEDA_UDP_ACK_QUEUE* that
will use a processor to formulate the ACK body and then send it as the UDP
reply via the Camel netty component.

I can only get this to work and actually send the ACK if I use a hard-coded
port number in the netty URI. But in production, I'll be receiving from many
different UDP clients and I need to send back to their ephemeral port
number.  I'm looking for a way to parameterize the sending of the UDP
replies back using the *host:port* combo that is specified in the
*CamelNettyRemoteAddress* header of the Exchange.

Any thoughts greatly appreciated.  Below are my routes.

  Thanks, SteveR

	 
from("netty:udp://dloco.m.mission.net:11111?serverPipelineFactory=#MY_SERVER_PIPELINE_FACTORY&amp;keepAlive=true&amp;sync=true&amp;orderedThreadPoolExecutor=false&amp;receiveBufferSize=26214400&amp;sendBufferSize=26214400&amp;allowDefaultCodec=false&amp;disconnectOnNoReply=false&amp;receiveBufferSizePredictor=8192")
			.setExchangePattern(ExchangePattern.InOnly)
			.routeId(sourceRouteId)
			.startupOrder(routeStartupOrder)
			.setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
			.threads(threadPoolSize /*poolSize*/, threadPoolSize * 2
/*maxPoolSize*/).threadName("threads_" + sourceRouteId);
			.to(kafkaQueueURI).id(sourceRouteId + "_TO_KAFKA_QUEUE");

		// ------------------------
		// SEDA_KAFKA_QUEUE
		// ------------------------
		from(kafkaQueueURI)
			.errorHandler(deadLetterChannelBuilder) // Add route-scoped
DeadLetterChannel error handler
			.onCompletion()
				.onCompleteOnly()     // Synchronize only after Exchange completes
successfully with no errors
				.parallelProcessing() // Tells Camel to use a thread pool for
onCompletion route
				.to(ackQueueURI).id(sourceRouteId + "_ON_COMPLETION_ONLY_TO_ACK_QUEUE")
			.end() // Must use end() to denote the end of the onCompletion route
			.setExchangePattern(ExchangePattern.InOnly)
			.routeId(kafkaQueueRouteId)
			.startupOrder(kafkaQueueRouteStartupOrder)
			.setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
			.threads(threadPoolSize /*poolSize*/, threadPoolSize * 2
/*maxPoolSize*/).threadName("threads_" + kafkaQueueRouteId)
			// ------------------------------------------------
			// This processor handles Kafka related processing.
			// For example, determining the Kafka partitioning.
			// ------------------------------------------------
			.process(kafkaProcessor)
				.id(kafkaProcessorId)
			// ---------------------------------------------------
			// Here we route to the final destination (e.g. Kafka)
			// ---------------------------------------------------
			.to(kafkaToURI);

		// ----------------------------
		// SEDA_UDP_ACK_QUEUE
		// ----------------------------
		from(ackQueueURI)
			.setExchangePattern(ExchangePattern.InOut)
			.routeId(ackQueueRouteId)
			.startupOrder(ackQueueRouteStartupOrder)
			.setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
			.process(ackBackProcessor)
				.id(ackBackProcessorId)
		
.to("netty:udp://dloco.m.mission.net:9998?clientPipelineFactory=#MY_CLIENT_PIPELINE_FACTORY&sendBufferSize=26214400&allowDefaultCodec=false");




--
View this message in context: http://camel.465427.n5.nabble.com/Camel-netty-How-to-send-UDP-reply-based-on-NETTY-REMOTE-ADDRESS-tp5773044.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel netty: How to send UDP reply based on NETTY_REMOTE_ADDRESS?

Posted by SteveR <sr...@vonage.com>.
Below is what I found that seems to work, with one issue that I can't yet
figure out:

After sending some test data into my application, and when I subsequently
use *VisualVM* to examine this Camel JMX MBean route, it always shows all
exchanges as *InflightExchanges* (i.e. the exchanges never show as
CompletedExchanges).

Any thoughts on how to debug this issue?

  Thanks, SteveR


        // I extract the remote address in my processor, get the remote host
and remote port,
        // and set them as input headers on the exchange.

	Message inMsg = exchange.getIn();
	String remoteAddress =
inMsg.getHeader(NettyConstants.NETTY_REMOTE_ADDRESS).toString();
	String[] remoteAddressParts = remoteAddress.split(":");
	String remoteHost = remoteAddressParts[0].replace("/", "");
	String remotePort = remoteAddressParts[1];
	inMsg.setHeader("REMOTE_HOST_IP", remoteHost);
	inMsg.setHeader("REMOTE_PORT_NUMBER", remotePort);


        // This in my route, I use recipientList EIP and the simple language
to dynamically add them to the netty URI.

	from(ackQueueURI)
		.setExchangePattern(ExchangePattern.InOnly)
		.routeId(ackQueueRouteId)
		.startupOrder(ackQueueRouteStartupOrder)
		.setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
		.threads(threadPoolSize /*poolSize*/, threadPoolSize * 2 /*maxPoolSize*/)
			.threadName("threads_" + sourceRouteId + "_ACK_QUEUE")
		.process(cqmsAckBackProcessor)
			.id(cqmsAckBackProcessorId)
		.recipientList( // See
http://camel.apache.org/how-do-i-use-dynamic-uri-in-to.html
			simple(
				"netty:udp://${header.REMOTE_HOST_IP}:" +
				"${header.REMOTE_PORT_NUMBER}?" +
			
"clientPipelineFactory=#CLIENT_PIPELINE_FACTORY_ROUTE_ID_RAW_CQMS_EVENTS&" +
				"sendBufferSize=26214400&allowDefaultCodec=false"
			)
		)
	.end();



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-netty-How-to-send-UDP-reply-based-on-NETTY-REMOTE-ADDRESS-tp5773044p5773077.html
Sent from the Camel - Users mailing list archive at Nabble.com.