You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/24 17:40:28 UTC

svn commit: r1329798 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/handlers/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Tue Apr 24 15:40:27 2012
New Revision: 1329798

URL: http://svn.apache.org/viewvc?rev=1329798&view=rev
Log:
CAMEL-5211: Netty consumer now leverages async non blocking routing engine.

Added:
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java
      - copied, changed from r1329730, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1329798&r1=1329797&r2=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Tue Apr 24 15:40:27 2012
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Logger LOG = LoggerFactory.getLogger(ClientChannelHandler.class);
     private final NettyProducer producer;
-    private boolean messageReceived;
+    private volatile boolean messageReceived;
     private volatile boolean exceptionHandled;
 
     public ClientChannelHandler(NettyProducer producer) {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1329798&r1=1329797&r2=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Tue Apr 24 15:40:27 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty.handlers;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.netty.NettyConstants;
@@ -70,12 +71,12 @@ public class ServerChannelHandler extend
     }
     
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) throws Exception {
         Object in = messageEvent.getMessage();
         LOG.debug("Incoming message: {}", in);
 
         // create Exchange and let the consumer process it
-        Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+        final Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
@@ -84,16 +85,39 @@ public class ServerChannelHandler extend
             exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(consumer.getConfiguration().getCharsetName()));
         }
 
+        // process accordingly to endpoint configuration
+        if (consumer.getEndpoint().isSynchronous()) {
+            processSynchronously(exchange, messageEvent);
+        } else {
+            processAsynchronously(exchange, messageEvent);
+        }
+    }
+
+    private void processSynchronously(final Exchange exchange, final MessageEvent messageEvent) {
         try {
             consumer.getProcessor().process(exchange);
+            if (consumer.getConfiguration().isSync()) {
+                sendResponse(messageEvent, exchange);
+            }
         } catch (Throwable e) {
             consumer.getExceptionHandler().handleException(e);
         }
+    }
 
-        // send back response if the communication is synchronous
-        if (consumer.getConfiguration().isSync()) {
-            sendResponse(messageEvent, exchange);
-        }
+    private void processAsynchronously(final Exchange exchange, final MessageEvent messageEvent) {
+        consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+            @Override
+            public void done(boolean doneSync) {
+                // send back response if the communication is synchronous
+                try {
+                    if (consumer.getConfiguration().isSync()) {
+                        sendResponse(messageEvent, exchange);
+                    }
+                } catch (Throwable e) {
+                    consumer.getExceptionHandler().handleException(e);
+                }
+            }
+        });
     }
 
     private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {

Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java (from r1329730, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java&r1=1329730&r2=1329798&rev=1329798&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutNonBlockingTest.java Tue Apr 24 15:40:27 2012
@@ -24,13 +24,15 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class NettyTextlineInOutSynchronousFalseTest extends BaseNettyTest {
+public class NettyTextlineInOutNonBlockingTest extends BaseNettyTest {
 
     private static String beforeThreadName;
     private static String afterThreadName;
+    private static String beforeThreadName2;
+    private static String afterThreadName2;
 
     @Test
-    public void testSynchronous() throws Exception {
+    public void testNonBlocking() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
 
         String reply = template.requestBody("direct:start", "Hello World", String.class);
@@ -39,6 +41,7 @@ public class NettyTextlineInOutSynchrono
         assertMockEndpointsSatisfied();
 
         assertFalse("Should not same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should not same threads", beforeThreadName2.equalsIgnoreCase(afterThreadName2));
     }
 
     @Override
@@ -53,7 +56,7 @@ public class NettyTextlineInOutSynchrono
                             beforeThreadName = Thread.currentThread().getName();
                         }
                     })
-                    .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&synchronous=false")
+                    .to("netty:tcp://localhost:{{port}}?textline=true&sync=true")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             afterThreadName = Thread.currentThread().getName();
@@ -62,9 +65,21 @@ public class NettyTextlineInOutSynchrono
                     .to("log:after")
                     .to("mock:result");
 
-                from("netty:tcp://localhost:{{port}}?textline=true&sync=true&synchronous=false")
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName2 = Thread.currentThread().getName();
+                        }
+                    })
                     // body should be a String when using textline codec
                     .validate(body().isInstanceOf(String.class))
+                    // async delayed is non blocking
+                    .delay(100).asyncDelayed()
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName2 = Thread.currentThread().getName();
+                        }
+                    })
                     .transform(body().regexReplaceAll("Hello", "Bye"));
             }
         };