You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/24 17:40:28 UTC
svn commit: r1329798 - in /camel/trunk/components/camel-netty/src:
main/java/org/apache/camel/component/netty/handlers/
test/java/org/apache/camel/component/netty/
Author: davsclaus
Date: Tue Apr 24 15:40:27 2012
New Revision: 1329798
URL: http://svn.apache.org/viewvc?rev=1329798&view=rev
Log:
CAMEL-5211: Netty consumer now leverages async non blocking routing engine.
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java
- copied, changed from r1329730, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1329798&r1=1329797&r2=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Tue Apr 24 15:40:27 2012
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Logger LOG = LoggerFactory.getLogger(ClientChannelHandler.class);
private final NettyProducer producer;
- private boolean messageReceived;
+ private volatile boolean messageReceived;
private volatile boolean exceptionHandled;
public ClientChannelHandler(NettyProducer producer) {
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1329798&r1=1329797&r2=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Tue Apr 24 15:40:27 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.netty.handlers;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.component.netty.NettyConstants;
@@ -70,12 +71,12 @@ public class ServerChannelHandler extend
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) throws Exception {
Object in = messageEvent.getMessage();
LOG.debug("Incoming message: {}", in);
// create Exchange and let the consumer process it
- Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+ final Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
if (consumer.getConfiguration().isSync()) {
exchange.setPattern(ExchangePattern.InOut);
}
@@ -84,16 +85,39 @@ public class ServerChannelHandler extend
exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(consumer.getConfiguration().getCharsetName()));
}
+ // process accordingly to endpoint configuration
+ if (consumer.getEndpoint().isSynchronous()) {
+ processSynchronously(exchange, messageEvent);
+ } else {
+ processAsynchronously(exchange, messageEvent);
+ }
+ }
+
+ private void processSynchronously(final Exchange exchange, final MessageEvent messageEvent) {
try {
consumer.getProcessor().process(exchange);
+ if (consumer.getConfiguration().isSync()) {
+ sendResponse(messageEvent, exchange);
+ }
} catch (Throwable e) {
consumer.getExceptionHandler().handleException(e);
}
+ }
- // send back response if the communication is synchronous
- if (consumer.getConfiguration().isSync()) {
- sendResponse(messageEvent, exchange);
- }
+ private void processAsynchronously(final Exchange exchange, final MessageEvent messageEvent) {
+ consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ // send back response if the communication is synchronous
+ try {
+ if (consumer.getConfiguration().isSync()) {
+ sendResponse(messageEvent, exchange);
+ }
+ } catch (Throwable e) {
+ consumer.getExceptionHandler().handleException(e);
+ }
+ }
+ });
}
private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {
Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java (from r1329730, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java&r1=1329730&r2=1329798&rev=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java Tue Apr 24 15:40:27 2012
@@ -24,13 +24,15 @@ import org.junit.Test;
/**
* @version
*/
-public class NettyTextlineInOutSynchronousFalseTest extends BaseNettyTest {
+public class NettyTextlineInOutNonBlockingTest extends BaseNettyTest {
private static String beforeThreadName;
private static String afterThreadName;
+ private static String beforeThreadName2;
+ private static String afterThreadName2;
@Test
- public void testSynchronous() throws Exception {
+ public void testNonBlocking() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
String reply = template.requestBody("direct:start", "Hello World", String.class);
@@ -39,6 +41,7 @@ public class NettyTextlineInOutSynchrono
assertMockEndpointsSatisfied();
assertFalse("Should not same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should not same threads", beforeThreadName2.equalsIgnoreCase(afterThreadName2));
}
@Override
@@ -53,7 +56,7 @@ public class NettyTextlineInOutSynchrono
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&synchronous=false")
+ .to("netty:tcp://localhost:{{port}}?textline=true&sync=true")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -62,9 +65,21 @@ public class NettyTextlineInOutSynchrono
.to("log:after")
.to("mock:result");
- from("netty:tcp://localhost:{{port}}?textline=true&sync=true&synchronous=false")
+ from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName2 = Thread.currentThread().getName();
+ }
+ })
// body should be a String when using textline codec
.validate(body().isInstanceOf(String.class))
+ // async delayed is non blocking
+ .delay(100).asyncDelayed()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName2 = Thread.currentThread().getName();
+ }
+ })
.transform(body().regexReplaceAll("Hello", "Bye"));
}
};