You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/05 17:17:22 UTC

svn commit: r960621 - in /camel/trunk/components: camel-mina/src/main/java/org/apache/camel/component/mina/ camel-mina/src/test/java/org/apache/camel/component/mina/ camel-mina/src/test/resources/ camel-netty/src/main/java/org/apache/camel/component/ne...

Author: davsclaus
Date: Mon Jul  5 15:17:21 2010
New Revision: 960621

URL: http://svn.apache.org/viewvc?rev=960621&view=rev
Log:
CAMEL-2907: NettyProducer supports async routing engine. CAMEL-2908: Added textline option to Netty.

Added:
    camel/trunk/components/camel-netty/src/test/data/
    camel/trunk/components/camel-netty/src/test/data/message1.txt   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java   (with props)
Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
    camel/trunk/components/camel-mina/src/test/resources/log4j.properties
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
    camel/trunk/components/camel-netty/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java Mon Jul  5 15:17:21 2010
@@ -70,7 +70,6 @@ public class MinaConfiguration implement
         return Charset.forName(encoding).name();
     }
 
-
     public String getProtocol() {
         return protocol;
     }

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Mon Jul  5 15:17:21 2010
@@ -89,7 +89,7 @@ public class MinaProducer extends Defaul
 
         // if textline enabled then covert to a String which must be used for textline
         if (endpoint.getConfiguration().isTextline()) {
-            body = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, exchange, body);
+            body = endpoint.getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
         }
 
         // if sync is true then we should also wait for a response (synchronous mode)

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java Mon Jul  5 15:17:21 2010
@@ -39,7 +39,7 @@ public class MinaVmTest extends ContextT
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from(uri).to("mock:result");
+                from(uri).to("log:before?showAll=true").to("mock:result").to("log:after?showAll=true");
             }
         };
     }

Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Mon Jul  5 15:17:21 2010
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, stdout
 
 #log4j.logger.org.apache.camel.component.mina=DEBUG
 #log4j.logger.org.apache.camel=DEBUG

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Mon Jul  5 15:17:21 2010
@@ -17,8 +17,11 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLEngine;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.apache.commons.logging.Log;
@@ -29,33 +32,25 @@ import org.jboss.netty.channel.ChannelPi
 import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
 
 public class ClientPipelineFactory implements ChannelPipelineFactory {
     private static final transient Log LOG = LogFactory.getLog(ClientPipelineFactory.class);
-    private NettyProducer producer;
-    private ChannelPipeline channelPipeline;
+    private final NettyProducer producer;
+    private final Exchange exchange;
+    private final AsyncCallback callback;
 
-    public ClientPipelineFactory(NettyProducer producer) {
+    public ClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) {
         this.producer = producer;
+        this.exchange = exchange;
+        this.callback = callback;
     }
 
     public ChannelPipeline getPipeline() throws Exception {
-        if (channelPipeline != null) {
-            // http://docs.jboss.org/netty/3.1/api/org/jboss/netty/handler/ssl/SslHandler.html
-            // To restart the SSL session, you must remove the existing closed SslHandler
-            // from the ChannelPipeline, insert a new SslHandler with a new SSLEngine into
-            // the pipeline, and start the handshake process as described in the first section.
-            if (channelPipeline.remove("ssl") != null) {
-                // reinitialize and add SSL first
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Client SSL handler re-initialized on the ChannelPipeline");
-                }
-                channelPipeline.addFirst("ssl", configureClientSSLOnDemand());
-            }
-            return channelPipeline;
-        }
-
-        channelPipeline = Channels.pipeline();
+        // create a new pipeline
+        ChannelPipeline channelPipeline = Channels.pipeline();
 
         SslHandler sslHandler = configureClientSSLOnDemand();
         if (sslHandler != null) {
@@ -65,6 +60,12 @@ public class ClientPipelineFactory imple
             channelPipeline.addLast("ssl", sslHandler);
         }
 
+        // use read timeout handler to handle timeout while waiting for a remote reply (while reading from the remote host)
+        if (producer.getConfiguration().getTimeout() > 0) {
+            Timer timer = new HashedWheelTimer();
+            channelPipeline.addLast("timeout", new ReadTimeoutHandler(timer, producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS));
+        }
+
         List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders();
         for (int x = 0; x < decoders.size(); x++) {
             channelPipeline.addLast("decoder-" + x, decoders.get(x));
@@ -75,11 +76,8 @@ public class ClientPipelineFactory imple
             channelPipeline.addLast("encoder-" + x, encoders.get(x));
         }
 
-        if (producer.getConfiguration().getHandler() != null) {
-            channelPipeline.addLast("handler", producer.getConfiguration().getHandler());
-        } else {
-            channelPipeline.addLast("handler", new ClientChannelHandler(producer));
-        }
+        // our handler must be added last
+        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));
 
         return channelPipeline;
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Mon Jul  5 15:17:21 2010
@@ -18,23 +18,29 @@ package org.apache.camel.component.netty
 
 import java.io.File;
 import java.net.URI;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.URISupport;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
-import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
 import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.CharsetUtil;
 
 @SuppressWarnings("unchecked")
 public class NettyConfiguration implements Cloneable {
+    private static final transient Log LOG = LogFactory.getLog(NettyConfiguration.class);
+
     private String protocol;
     private String host;
     private int port;
@@ -45,13 +51,14 @@ public class NettyConfiguration implemen
     private long timeout = 30000;
     private boolean reuseAddress = true;
     private boolean sync = true;
+    private boolean textline;
+    private String encoding;
     private String passphrase;
     private File keyStoreFile;
     private File trustStoreFile;
     private SslHandler sslHandler;
     private List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
     private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
-    private ChannelHandler handler;
     private boolean ssl;
     private long sendBufferSize = 65536;
     private long receiveBufferSize = 65536;
@@ -99,70 +106,53 @@ public class NettyConfiguration implemen
         keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null);
         trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null);
 
+        // set custom encoders and decoders first
         List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelDownstreamHandler.class, null);
         addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class);
         List<ChannelUpstreamHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelUpstreamHandler.class, null);
         addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class);
 
+        // then set parameters with the help of the camel context type converters
+        EndpointHelper.setProperties(component.getCamelContext(), this, parameters);
+
+        // add default encoders and decoders
         if (encoders.isEmpty() && decoders.isEmpty()) {
-            encoders.add(component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder()));
-            decoders.add(component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder()));
-        }
+            // are we textline or object?
+            if (isTextline()) {
+                Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
+                encoders.add(new StringEncoder(charset));
+                decoders.add(new StringDecoder(charset));
 
-        handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Using textline encoders and decoders with charset: " + charset);
+                }
+            } else {
+                // object serializable is then used
+                encoders.add(new ObjectEncoder());
+                decoders.add(new ObjectDecoder());
 
-        Map<String, Object> settings = URISupport.parseParameters(uri);
-        if (settings.containsKey("keepAlive")) {
-            setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive")));
-        }
-        if (settings.containsKey("tcpNoDelay")) {
-            setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay")));
-        }
-        if (settings.containsKey("broadcast")) {
-            setBroadcast(Boolean.valueOf((String) settings.get("broadcast")));
-        }
-        if (settings.containsKey("reuseAddress")) {
-            setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
-        }
-        if (settings.containsKey("connectTimeoutMillis")) {
-            setConnectTimeout(Long.valueOf((String) settings.get("connectTimeoutMillis")));
-        }
-        if (settings.containsKey("sync")) {
-            setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
-        }
-        if (settings.containsKey("receiveTimeoutMillis")) {
-            setTimeout(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
-        }
-        if (settings.containsKey("sendBufferSize")) {
-            setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
-        }
-        if (settings.containsKey("receiveBufferSize")) {
-            setReceiveBufferSize(Long.valueOf((String) settings.get("receiveBufferSize")));
-        }
-        if (settings.containsKey("ssl")) {
-            setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl")));
-        }
-        if (settings.containsKey("corePoolSize")) {
-            setCorePoolSize(Integer.valueOf((String) settings.get("corePoolSize")));
-        }
-        if (settings.containsKey("maxPoolSize")) {
-            setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
-        }
-        if (settings.containsKey("disconnect")) {
-            setDisconnect(Boolean.valueOf((String) settings.get("disconnect")));
-        }
-        if (settings.containsKey("lazyChannelCreation")) {
-            setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation")));
-        }
-        if (settings.containsKey("transferExchange")) {
-            setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Using object encoders and decoders");
+                }
+            }
+        } else {
+            LOG.debug("Using configured encoders and/or decoders");
         }
-        if (settings.containsKey("disconnectOnNoReply")) {
-            setDisconnectOnNoReply(Boolean.valueOf((String) settings.get("disconnectOnNoReply")));
+    }
+
+    public String getCharsetName() {
+        if (encoding == null) {
+            return null;
         }
-        if (settings.containsKey("noReplyLogLevel")) {
-            setNoReplyLogLevel(LoggingLevel.valueOf((String) settings.get("noReplyLogLevel")));
+        if (!Charset.isSupported(encoding)) {
+            throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
         }
+
+        return Charset.forName(encoding).name();
+    }
+
+    protected boolean isTcp() {
+        return protocol.equalsIgnoreCase("tcp");
     }
 
     public String getProtocol() {
@@ -237,6 +227,22 @@ public class NettyConfiguration implemen
         this.sync = sync;
     }
 
+    public boolean isTextline() {
+        return textline;
+    }
+
+    public void setTextline(boolean textline) {
+        this.textline = textline;
+    }
+
+    public String getEncoding() {
+        return encoding;
+    }
+
+    public void setEncoding(String encoding) {
+        this.encoding = encoding;
+    }
+
     public SslHandler getSslHandler() {
         return sslHandler;
     }
@@ -281,14 +287,6 @@ public class NettyConfiguration implemen
         this.decoders = decoders;
     }
 
-    public ChannelHandler getHandler() {
-        return handler;
-    }
-
-    public void setHandler(ChannelHandler handler) {
-        this.handler = handler;
-    }
-
     public long getTimeout() {
         return timeout;
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Mon Jul  5 15:17:21 2010
@@ -60,11 +60,15 @@ public class NettyConsumer extends Defau
 
     @Override
     protected void doStart() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Netty consumer binding to: " + configuration.getAddress());
+        }
+
         super.doStart();
-        if (configuration.getProtocol().equalsIgnoreCase("udp")) {
-            initializeUDPServerSocketCommunicationLayer();
-        } else {
+        if (isTcp()) {
             initializeTCPServerSocketCommunicationLayer();
+        } else {
+            initializeUDPServerSocketCommunicationLayer();
         }
 
         LOG.info("Netty consumer bound to: " + configuration.getAddress());
@@ -72,8 +76,8 @@ public class NettyConsumer extends Defau
 
     @Override
     protected void doStop() throws Exception {
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Netty consumer unbinding from: " + configuration.getAddress());
         }
 
         // close all channels
@@ -86,6 +90,12 @@ public class NettyConsumer extends Defau
         }
 
         super.doStop();
+
+        LOG.info("Netty consumer unbound from: " + configuration.getAddress());
+    }
+
+    public CamelContext getContext() {
+        return context;
     }
 
     public ChannelGroup getAllChannels() {
@@ -132,6 +142,10 @@ public class NettyConsumer extends Defau
         this.connectionlessServerBootstrap = connectionlessServerBootstrap;
     }
 
+    protected boolean isTcp() {
+        return configuration.getProtocol().equalsIgnoreCase("tcp");
+    }
+
     private void initializeTCPServerSocketCommunicationLayer() throws Exception {
         ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
                 configuration.getCorePoolSize(), configuration.getMaxPoolSize());

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Mon Jul  5 15:17:21 2010
@@ -47,7 +47,7 @@ public final class NettyHelper {
      * @throws CamelExchangeException is thrown if the body could not be written for some reasons
      *                                (eg remote connection is closed etc.)
      */
-    public static void writeBody(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
+    public static void writeBodySync(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
         // the write operation is asynchronous. Use future to wait until the session has been written
         ChannelFuture future;
         if (remoteAddress != null) {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Mon Jul  5 15:17:21 2010
@@ -17,18 +17,17 @@
 package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.ServicePoolAware;
-import org.apache.camel.component.netty.handlers.ClientChannelHandler;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.processor.Logger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -38,6 +37,7 @@ import org.jboss.netty.bootstrap.Connect
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -46,26 +46,19 @@ import org.jboss.netty.channel.socket.Da
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
-public class NettyProducer extends DefaultProducer implements ServicePoolAware {
+public class NettyProducer extends DefaultAsyncProducer implements ServicePoolAware {
     private static final transient Log LOG = LogFactory.getLog(NettyProducer.class);
-    private final ChannelGroup allChannels;
+    private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
     private CamelContext context;
     private NettyConfiguration configuration;
-    private CountDownLatch countdownLatch;
     private ChannelFactory channelFactory;
     private DatagramChannelFactory datagramChannelFactory;
-    private Channel channel;
-    private ClientBootstrap clientBootstrap;
-    private ConnectionlessBootstrap connectionlessClientBootstrap;
-    private ClientPipelineFactory clientPipelineFactory;
-    private ChannelPipeline clientPipeline;
     private Logger noReplyLogger;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
         super(nettyEndpoint);
         this.configuration = configuration;
         this.context = this.getEndpoint().getCamelContext();
-        this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
         this.noReplyLogger = new Logger(LOG, configuration.getNoReplyLogLevel());
     }
 
@@ -81,17 +74,27 @@ public class NettyProducer extends Defau
         return false;
     }
 
+    public CamelContext getContext() {
+        return context;
+    }
+
+    protected boolean isTcp() {
+        return configuration.getProtocol().equalsIgnoreCase("tcp");
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
-        if (configuration.getProtocol().equalsIgnoreCase("udp")) {
-            setupUDPCommunication();
-        } else {
+        if (isTcp()) {
             setupTCPCommunication();
+        } else {
+            setupUDPCommunication();
         }
+
         if (!configuration.isLazyChannelCreation()) {
-            openConnection();
+            // ensure the connection can be established when we start up
+            openAndCloseConnection();
         }
     }
 
@@ -100,83 +103,111 @@ public class NettyProducer extends Defau
         if (LOG.isDebugEnabled()) {
             LOG.debug("Stopping producer at address: " + configuration.getAddress());
         }
-        closeConnection();
+        // close all channels
+        ChannelGroupFuture future = ALL_CHANNELS.close();
+        future.awaitUninterruptibly();
+
+        // and then release other resources
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+        }
         super.doStop();
     }
 
-    public void process(Exchange exchange) throws Exception {
-        if (channel == null && !configuration.isLazyChannelCreation()) {
-            throw new IllegalStateException("Not started yet!");
-        }
-        if (channel == null || !channel.isConnected()) {
-            openConnection();
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
         }
 
         Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
         if (body == null) {
             noReplyLogger.log("No payload to send for exchange: " + exchange);
-            return; // exit early since nothing to write
+            callback.done(true);
+            return true;
+        }
+        // if textline enabled then covert to a String which must be used for textline
+        if (getConfiguration().isTextline()) {
+            try {
+                body = context.getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+            } catch (NoTypeConversionAvailableException e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
+        // set the exchange encoding property
+        if (getConfiguration().getCharsetName() != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, getConfiguration().getCharsetName());
         }
 
-        if (configuration.isSync()) {
-            // only initialize latch if we should get a response
-            countdownLatch = new CountDownLatch(1);
+        ChannelFuture channelFuture;
+        final Channel channel;
+        try {
+            channelFuture = openConnection(exchange, callback);
+            channel = openChannel(channelFuture);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         // log what we are writing
         if (LOG.isDebugEnabled()) {
-            Object out = body;
-            if (body instanceof byte[]) {
-                // byte arrays is not readable so convert to string
-                out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
-            }
-            LOG.debug("Writing body : " + out);
+            LOG.debug("Writing body: " + body);
         }
+        // write the body asynchronously
+        ChannelFuture future = channel.write(body);
 
-        // write the body
-        NettyHelper.writeBody(channel, null, body, exchange);
-
-        if (configuration.isSync()) {
-            boolean success = countdownLatch.await(configuration.getTimeout(), TimeUnit.MILLISECONDS);
-            if (!success) {
-                throw new ExchangeTimedOutException(exchange, configuration.getTimeout());
-            }
+        // add listener which handles the operation
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation complete " + channelFuture);
+                }
+                if (!channelFuture.isSuccess()) {
+                    // no success the set the caused exception and signal callback and break
+                    exchange.setException(channelFuture.getCause());
+                    callback.done(false);
+                    return;
+                }
 
-            ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
-            if (handler.getCause() != null) {
-                throw new CamelExchangeException("Error occurred in ClientChannelHandler", exchange, handler.getCause());
-            } else if (!handler.isMessageReceived()) {
-                // no message received
-                throw new CamelExchangeException("No response received from remote server: " + configuration.getAddress(), exchange);
-            } else {
-                // set the result on either IN or OUT on the original exchange depending on its pattern
-                if (ExchangeHelper.isOutCapable(exchange)) {
-                    NettyPayloadHelper.setOut(exchange, handler.getMessage());
-                } else {
-                    NettyPayloadHelper.setIn(exchange, handler.getMessage());
+                // if we do not expect any reply then signal callback to continue routing
+                if (!configuration.isSync()) {
+                    try {
+                        // should channel be closed after complete?
+                        Boolean close;
+                        if (ExchangeHelper.isOutCapable(exchange)) {
+                            close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                        } else {
+                            close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                        }
+
+                        // should we disconnect, the header can override the configuration
+                        boolean disconnect = getConfiguration().isDisconnect();
+                        if (close != null) {
+                            disconnect = close;
+                        }
+                        if (disconnect) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
+                            }
+                            NettyHelper.close(channel);
+                        }
+                    } finally {
+                        // signal callback to continue routing
+                        callback.done(false);
+                    }
                 }
             }
-        }
-
-        // should channel be closed after complete?
-        Boolean close;
-        if (ExchangeHelper.isOutCapable(exchange)) {
-            close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
-        } else {
-            close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
-        }
+        });
 
-        // should we disconnect, the header can override the configuration
-        boolean disconnect = getConfiguration().isDisconnect();
-        if (close != null) {
-            disconnect = close;
-        }
-        if (disconnect) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
-            }
-            NettyHelper.close(channel);
-        }
+        // continue routing asynchronously
+        return false;
     }
 
     protected void setupTCPCommunication() throws Exception {
@@ -187,13 +218,6 @@ public class NettyProducer extends Defau
                     configuration.getCorePoolSize(), configuration.getMaxPoolSize());
             channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
         }
-        if (clientBootstrap == null) {
-            clientBootstrap = new ClientBootstrap(channelFactory);
-            clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
-            clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
-            clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
-        }
     }
 
     protected void setupUDPCommunication() throws Exception {
@@ -202,8 +226,29 @@ public class NettyProducer extends Defau
                     configuration.getCorePoolSize(), configuration.getMaxPoolSize());
             datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
         }
-        if (connectionlessClientBootstrap == null) {
-            connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+    }
+
+    private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws Exception {
+        ChannelFuture answer;
+
+        // initialize client pipeline factory
+        ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(this, exchange, callback);
+        // must get the pipeline from the factory when opening a new connection
+        ChannelPipeline clientPipeline = clientPipelineFactory.getPipeline();
+
+        if (isTcp()) {
+            ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
+            clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+            clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+            clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+
+            // set the pipeline on the bootstrap
+            clientBootstrap.setPipeline(clientPipeline);
+            answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            return answer;
+        } else {
+            ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
             connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
             connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
@@ -212,52 +257,38 @@ public class NettyProducer extends Defau
             connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
             connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
 
-        }
-    }
-
-    private void openConnection() throws Exception {
-        ChannelFuture channelFuture;
-
-        // initialize client pipeline factory
-        if (clientPipelineFactory == null) {
-            clientPipelineFactory = new ClientPipelineFactory(this);
-        }
-        // must get the pipeline from the factory when opening a new connection
-        clientPipeline = clientPipelineFactory.getPipeline();
-
-        if (clientBootstrap != null) {
-            clientBootstrap.setPipeline(clientPipeline);
-            channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        } else if (connectionlessClientBootstrap != null) {
+            // set the pipeline on the bootstrap
             connectionlessClientBootstrap.setPipeline(clientPipeline);
             connectionlessClientBootstrap.bind(new InetSocketAddress(0));
-            channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        } else {
-            throw new IllegalStateException("Should either be TCP or UDP");
+            answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            return answer;
         }
+    }
 
+    private Channel openChannel(ChannelFuture channelFuture) throws Exception {
+        // wait until we got connection
         channelFuture.awaitUninterruptibly();
         if (!channelFuture.isSuccess()) {
             throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
         }
-        channel = channelFuture.getChannel();
+        Channel channel = channelFuture.getChannel();
         // to keep track of all channels in use
-        allChannels.add(channel);
+        ALL_CHANNELS.add(channel);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating connector to address: " + configuration.getAddress());
         }
+        return channel;
     }
 
-    private void closeConnection() throws Exception {
-        // close all channels
-        ChannelGroupFuture future = allChannels.close();
-        future.awaitUninterruptibly();
-
-        // and then release other resources
-        if (channelFactory != null) {
-            channelFactory.releaseExternalResources();
-        }
+    private void openAndCloseConnection() throws Exception {
+        ChannelFuture future = openConnection(new DefaultExchange(context), new AsyncCallback() {
+            public void done(boolean doneSync) {
+                // noop
+            }
+        });
+        Channel channel = openChannel(future);
+        NettyHelper.close(channel);
     }
 
     public NettyConfiguration getConfiguration() {
@@ -268,10 +299,6 @@ public class NettyProducer extends Defau
         this.configuration = configuration;
     }
 
-    public CountDownLatch getCountdownLatch() {
-        return countdownLatch;
-    }
-
     public ChannelFactory getChannelFactory() {
         return channelFactory;
     }
@@ -280,23 +307,7 @@ public class NettyProducer extends Defau
         this.channelFactory = channelFactory;
     }
 
-    public ClientBootstrap getClientBootstrap() {
-        return clientBootstrap;
-    }
-
-    public void setClientBootstrap(ClientBootstrap clientBootstrap) {
-        this.clientBootstrap = clientBootstrap;
-    }
-
-    public ClientPipelineFactory getClientPipelineFactory() {
-        return clientPipelineFactory;
-    }
-
-    public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) {
-        this.clientPipelineFactory = clientPipelineFactory;
-    }
-
     public ChannelGroup getAllChannels() {
-        return allChannels;
+        return ALL_CHANNELS;
     }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Mon Jul  5 15:17:21 2010
@@ -17,7 +17,6 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
-
 import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
@@ -49,21 +48,19 @@ public class ServerPipelineFactory imple
             }
             channelPipeline.addLast("ssl", sslHandler);            
         }
-        List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
-        for (int x = 0; x < decoders.size(); x++) {
-            channelPipeline.addLast("decoder-" + x, decoders.get(x));
-        }
-
         List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders();
         for (int x = 0; x < encoders.size(); x++) {
             channelPipeline.addLast("encoder-" + x, encoders.get(x));
         }
-        if (consumer.getConfiguration().getHandler() != null) {
-            channelPipeline.addLast("handler", consumer.getConfiguration().getHandler());
-        } else {
-            channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+
+        List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            channelPipeline.addLast("decoder-" + x, decoders.get(x));
         }
-         
+
+        // our handler must be added last
+        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+
         return channelPipeline;
     }
     

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Mon Jul  5 15:17:21 2010
@@ -16,37 +16,41 @@
  */
 package org.apache.camel.component.netty.handlers;
 
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.camel.CamelException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyHelper;
+import org.apache.camel.component.netty.NettyPayloadHelper;
 import org.apache.camel.component.netty.NettyProducer;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.timeout.TimeoutException;
 
-@ChannelPipelineCoverage("all")
+/**
+ * Client handler which cannot be shared
+ */
 public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class);
-    private NettyProducer producer;
-    private Object message;
-    private Throwable cause;
+    private final NettyProducer producer;
+    private final Exchange exchange;
+    private final AsyncCallback callback;
     private boolean messageReceived;
+    private boolean exceptionHandled;
 
-    public ClientChannelHandler(NettyProducer producer) {
+    public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback callback) {
         super();
         this.producer = producer;
-    }
-
-    public void reset() {
-        this.message = null;
-        this.cause = null;
-        this.messageReceived = false;
+        this.exchange = exchange;
+        this.callback = callback;
     }
 
     @Override
@@ -57,59 +61,109 @@ public class ClientChannelHandler extend
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
-        this.message = null;
-        this.messageReceived = false;
-        this.cause = exceptionEvent.getCause();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Exception caught at Channel: " + ctx.getChannel(), exceptionEvent.getCause());
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Closing channel as an exception was thrown from Netty", cause);
         }
-        // close channel in case an exception was thrown
-        NettyHelper.close(exceptionEvent.getChannel());
+        if (exceptionHandled) {
+            // ignore subsequent exceptions being thrown
+            return;
+        }
+
+        exceptionHandled = true;
+        Throwable cause = exceptionEvent.getCause();
+
+        // was it the timeout
+        if (cause instanceof TimeoutException) {
+            // timeout occurred
+            exchange.setException(new ExchangeTimedOutException(exchange, producer.getConfiguration().getTimeout()));
+
+            // signal callback
+            callback.done(false);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Closing channel as an exception was thrown from Netty", cause);
+            }
+            // set the cause on the exchange
+            exchange.setException(cause);
+
+            // close channel in case an exception was thrown
+            NettyHelper.close(exceptionEvent.getChannel());
+
+            // signal callback
+            callback.done(false);
+        }
     }
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        if (producer.getConfiguration().isSync() && !messageReceived) {
-            // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Channel closed: " + ctx.getChannel());
+        }
+
+        if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+            // session was closed but no message received. This could be because the remote server had an internal error
+            // and could not return a response. We should count down to stop waiting for a response
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Channel closed but no message received from address: " + producer.getConfiguration().getAddress());
             }
-            // session was closed but no message received. This could be because the remote server had an internal error
-            // and could not return a response. We should count down to stop waiting for a response
-            countDown();
+            exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+            // signal callback
+            callback.done(false);
         }
     }
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
-        message = messageEvent.getMessage();
         messageReceived = true;
-        cause = null;
 
+        Object body = messageEvent.getMessage();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Message received: " + message);
+            LOG.debug("Message received: " + body);
+        }
+
+        // if textline enabled then covert to a String which must be used for textline
+        if (producer.getConfiguration().isTextline()) {
+            try {
+                body = producer.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+            } catch (NoTypeConversionAvailableException e) {
+                exchange.setException(e);
+                callback.done(false);
+            }
         }
 
-        // signal we have received message
-        countDown();
-    }
 
-    protected void countDown() {
-        if (producer.getConfiguration().isSync()) {
-            producer.getCountdownLatch().countDown();
+        // set the result on either IN or OUT on the original exchange depending on its pattern
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            NettyPayloadHelper.setOut(exchange, body);
+        } else {
+            NettyPayloadHelper.setIn(exchange, body);
         }
-    }
 
-    public Object getMessage() {
-        return message;
-    }
+        try {
+            // should channel be closed after complete?
+            Boolean close;
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+            } else {
+                close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+            }
 
-    public boolean isMessageReceived() {
-        return messageReceived;
+            // should we disconnect, the header can override the configuration
+            boolean disconnect = producer.getConfiguration().isDisconnect();
+            if (close != null) {
+                disconnect = close;
+            }
+            if (disconnect) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing channel when complete at address: " + producer.getConfiguration().getAddress());
+                }
+                NettyHelper.close(ctx.getChannel());
+            }
+        } finally {
+            // signal callback
+            callback.done(false);
+        }
     }
 
-    public Throwable getCause() {
-        return cause;
-    }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Mon Jul  5 15:17:21 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.netty.handlers;
 
-import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.netty.NettyConstants;
@@ -27,14 +26,17 @@ import org.apache.camel.processor.Logger
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
-@ChannelPipelineCoverage("all")
+/**
+ * Server handler which is shared
+ */
+@ChannelHandler.Sharable
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class);
     private NettyConsumer consumer;
@@ -47,40 +49,48 @@ public class ServerChannelHandler extend
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Channel open: " + e.getChannel());
+        }
         // to keep track of open sockets
-        consumer.getAllChannels().add(channelStateEvent.getChannel());
+        consumer.getAllChannels().add(e.getChannel());
     }
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        LOG.debug("Channel closed: " + e.getChannel());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Channel closed: " + e.getChannel());
+        }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
-        LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+        // only close if we are still allowed to run
+        if (consumer.isRunAllowed()) {
+            LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
 
-        // close channel in case an exception was thrown
-        NettyHelper.close(exceptionEvent.getChannel());
+            // close channel in case an exception was thrown
+            NettyHelper.close(exceptionEvent.getChannel());
+        }
     }
     
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
         Object in = messageEvent.getMessage();
         if (LOG.isDebugEnabled()) {
-            if (in instanceof byte[]) {
-                // byte arrays is not readable so convert to string
-                in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
-            }
             LOG.debug("Incoming message: " + in);
         }
-        
+
         // create Exchange and let the consumer process it
         Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
+        // set the exchange charset property for converting
+        if (consumer.getConfiguration().getCharsetName() != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, consumer.getConfiguration().getCharsetName());
+        }
 
         try {
             consumer.getProcessor().process(exchange);
@@ -123,14 +133,19 @@ public class ServerChannelHandler extend
                 NettyHelper.close(messageEvent.getChannel());
             }
         } else {
+            // if textline enabled then covert to a String which must be used for textline
+            if (consumer.getConfiguration().isTextline()) {
+                body = consumer.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+            }
+
             // we got a body to write
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Writing body: " + body);
             }
             if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
-                NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
+                NettyHelper.writeBodySync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
             } else {
-                NettyHelper.writeBody(messageEvent.getChannel(), null, body, exchange);
+                NettyHelper.writeBodySync(messageEvent.getChannel(), null, body, exchange);
             }
         }
 

Added: camel/trunk/components/camel-netty/src/test/data/message1.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/data/message1.txt?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/data/message1.txt (added)
+++ camel/trunk/components/camel-netty/src/test/data/message1.txt Mon Jul  5 15:17:21 2010
@@ -0,0 +1 @@
+Hello World
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/test/data/message1.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/data/message1.txt
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java Mon Jul  5 15:17:21 2010
@@ -20,7 +20,6 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
@@ -37,17 +36,14 @@ public class NettyExchangeTimeoutTest ex
     public void testUsingTimeoutParameter() throws Exception {
         // use a timeout value of 2 seconds (timeout is in millis) so we should actually get a response in this test
         Endpoint endpoint = this.context.getEndpoint("netty:tcp://localhost:" + PORT + "?timeout=2000");
-        Producer producer = endpoint.createProducer();
-        producer.start();
-        Exchange exchange = producer.createExchange();
-        exchange.getIn().setBody("Hello World");
+
         try {
-            producer.process(exchange);
-            fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+            template.sendBody(endpoint, "Hello World");
+            fail("Should have thrown a exception");
         } catch (Exception e) {
-            assertTrue("Should have thrown an ExchangeTimedOutException", e instanceof ExchangeTimedOutException);
+            ExchangeTimedOutException timeout = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, timeout.getTimeout());
         }
-        producer.stop();
     }
 
     protected RouteBuilder createRouteBuilder() {

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java Mon Jul  5 15:17:21 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyFileTcpTest extends CamelTestSupport {
+
+    @Test
+    public void testMinaRoute() throws Exception {
+        MockEndpoint endpoint = getMockEndpoint("mock:results");
+        endpoint.expectedMessageCount(1);
+        endpoint.message(0).body().startsWith("Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                // lets setup a server
+                from("netty:tcp://localhost:9123?sync=false&textline=true")
+                        .to("mock:results");
+
+                from("file:src/test/data?noop=true&fileName=message1.txt").
+                        to("netty:tcp://localhost:9123?sync=false&textline=true");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Mon Jul  5 15:17:21 2010
@@ -62,7 +62,8 @@ public class NettyInOutWithForcedNoRespo
                         .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
                         .otherwise().transform(constant(null));
 
-                from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+                from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=INFO")
+                    .to("log:foo")
                     .choice()
                         .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
                         .otherwise().transform(constant(null));

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java Mon Jul  5 15:17:21 2010
@@ -82,6 +82,7 @@ public class NettyTCPAsyncTest extends C
             @Override
             public void configure() throws Exception {
                 from("netty:tcp://localhost:5150?sync=false")
+                    .to("log:result")
                     .to("mock:result");                
             }
         };

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java Mon Jul  5 15:17:21 2010
@@ -34,34 +34,31 @@ public class NettyTCPSyncTest extends Ca
 
     @Test
     public void testTCPStringInOutWithNettyConsumer() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Beginning Test ---> testTCPInOutWithNettyConsumer()");
-        }
-        
         String response = producerTemplate.requestBody(
             "netty:tcp://localhost:5150?sync=true", 
             "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class);        
         assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
-  
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Completed Test ---> testTCPInOutWithNettyConsumer()");
-        }
+    }
+
+    @Test
+    public void testTCPStringInOutWithNettyConsumer2Times() throws Exception {
+        String response = producerTemplate.requestBody(
+            "netty:tcp://localhost:5150?sync=true",
+            "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class);
+        assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
+
+        response = producerTemplate.requestBody(
+            "netty:tcp://localhost:5150?sync=true",
+            "Hello World", String.class);
+        assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
     }
 
     @Test
     public void testTCPObjectInOutWithNettyConsumer() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Beginning Test ---> testUDPInOutWithNettyConsumer()");
-        }
-        
         Poetry poetry = new Poetry();
         Poetry response = (Poetry) producerTemplate.requestBody("netty:tcp://localhost:5150?sync=true", poetry);        
         assertEquals("Dr. Sarojini Naidu", response.getPoet());
-        
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Completed Test ---> testUDPInOutWithNettyConsumer()");
-        }
-    }  
+    }
     
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java Mon Jul  5 15:17:21 2010
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTcpWithInOutUsingPlainSocketTest extends CamelTestSupport {
+
+    private static final int PORT = 6333;
+    // use parameter sync=true to force InOut pattern
+    protected String uri = "netty:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+    @Test
+    public void testSendAndReceiveOnce() throws Exception {
+        String response = sendAndReceive("World");
+
+        assertNotNull("Nothing received from Mina", response);
+        assertEquals("Hello World", response);
+    }
+
+    @Test
+    public void testSendAndReceiveTwice() throws Exception {
+        String london = sendAndReceive("London");
+        String paris = sendAndReceive("Paris");
+
+        assertNotNull("Nothing received from Mina", london);
+        assertNotNull("Nothing received from Mina", paris);
+        assertEquals("Hello London", london);
+        assertEquals("Hello Paris", paris);
+    }
+
+    @Test
+    public void testReceiveNoResponseSinceOutBodyIsNull() throws Exception {
+        String out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+    }
+
+    @Test
+    public void testReceiveNoResponseSinceOutBodyIsNullTwice() throws Exception {
+        String out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+
+        out = sendAndReceive("force-null-out-body");
+        assertNull("no data should be recieved", out);
+    }
+
+    @Test
+    public void testExchangeFailedOutShouldBeNull() throws Exception {
+        String out = sendAndReceive("force-exception");
+        assertTrue("out should not be the same as in when the exchange has failed", !"force-exception".equals(out));
+        assertEquals("should get the exception here", out, "java.lang.IllegalArgumentException: Forced exception");
+    }
+
+    private String sendAndReceive(String input) throws IOException {
+        byte buf[] = new byte[128];
+
+        Socket soc = new Socket();
+        soc.connect(new InetSocketAddress("localhost", PORT));
+
+        // Send message using plain Socket to test if this works
+        OutputStream os = null;
+        InputStream is = null;
+        try {
+            os = soc.getOutputStream();
+            os.write(input.getBytes());
+
+            is = soc.getInputStream();
+            int len = is.read(buf);
+            if (len == -1) {
+                // no data received
+                return null;
+            }
+        } finally {
+            if (is != null) {
+                is.close();
+            }
+            if (os != null) {
+                os.close();
+            }
+            soc.close();
+        }
+
+        // convert the buffer to chars
+        StringBuilder sb = new StringBuilder();
+        for (byte b : buf) {
+            char ch = (char) b;
+            if (ch == '\n' || ch == 0) {
+                // newline denotes end of text (added in the end in the processor below)
+                break;
+            } else {
+                sb.append(ch);
+            }
+        }
+
+        return sb.toString();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) {
+                        String in = e.getIn().getBody(String.class);
+                        if ("force-null-out-body".equals(in)) {
+                            // forcing a null out body
+                            e.getOut().setBody(null);
+                        } else if ("force-exception".equals(in)) {
+                            // clear out before throwing exception
+                            e.getOut().setBody(null);
+                            throw new IllegalArgumentException("Forced exception");
+                        } else {
+                            e.getOut().setBody("Hello " + in);
+                        }
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java Mon Jul  5 15:17:21 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOnlyTest extends CamelTestSupport {
+
+    @Test
+    public void testTextlineInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World\nHow are you?");
+        
+        template.sendBody("netty:tcp://localhost:5149?textline=true&sync=false", "Hello World\nHow are you?");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:5149?textline=true&sync=false")
+                    // body should be a String when using textline codec
+                    .validate(body().isInstanceOf(String.class))
+                    .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java Mon Jul  5 15:17:21 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOutTest extends CamelTestSupport {
+
+    @Test
+    public void testTextlineInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        String reply = template.requestBody("netty:tcp://localhost:5148?textline=true&sync=true", "Hello World", String.class);
+        assertEquals("Bye World", reply);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:5148?textline=true&sync=true")
+                    // body should be a String when using textline codec
+                    .validate(body().isInstanceOf(String.class))
+                    .to("mock:result")
+                    .transform(body().regexReplaceAll("Hello", "Bye"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java Mon Jul  5 15:17:21 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyUdpWithInOutUsingPlainSocketTest extends CamelTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(NettyUdpWithInOutUsingPlainSocketTest.class);
+    private static final int PORT = 4445;
+
+    @Test
+    public void testSendAndReceiveOnce() throws Exception {
+        String out = sendAndReceiveUdpMessages("World");
+        assertNotNull("should receive data", out);
+        assertEquals("Hello World", out);
+    }
+
+    private String sendAndReceiveUdpMessages(String input) throws Exception {
+        DatagramSocket socket = new DatagramSocket();
+        InetAddress address = InetAddress.getByName("127.0.0.1");
+
+        byte[] data = input.getBytes();
+
+        DatagramPacket packet = new DatagramPacket(data, data.length, address, PORT);
+        LOG.debug("+++ Sending data +++");
+        socket.send(packet);
+
+        Thread.sleep(1000);
+
+        byte[] buf = new byte[128];
+        DatagramPacket receive = new DatagramPacket(buf, buf.length, address, PORT);
+        LOG.debug("+++ Receiving data +++");
+        socket.receive(receive);
+
+        socket.close();
+
+        return new String(receive.getData(), 0, receive.getLength());
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("netty:udp://127.0.0.1:" + PORT + "?textline=true&sync=true").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String s = exchange.getIn().getBody(String.class);
+                        LOG.debug("Server got: " + s);
+                        exchange.getOut().setBody("Hello " + s);
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Mon Jul  5 15:17:21 2010
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, file
 
 # uncomment the following to enable camel debugging
-log4j.logger.org.apache.camel.component.netty=DEBUG
+#log4j.logger.org.apache.camel.component.netty=TRACE
 #log4j.logger.org.apache.camel=DEBUG
 #log4j.logger.org.apache.commons.net=TRACE