You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 13:13:42 UTC

svn commit: r941661 - in /camel/trunk/components: camel-mina/src/main/java/org/apache/camel/component/mina/ camel-netty/src/main/java/org/apache/camel/component/netty/ camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ camel-netty/src...

Author: davsclaus
Date: Thu May  6 11:13:41 2010
New Revision: 941661

URL: http://svn.apache.org/viewvc?rev=941661&view=rev
Log:
CAMEL-2699: Improve camel-netty to properly shutdown. Also add features which we have in camel-mina but wasnt ported to camel-netty yet.

Added:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java   (with props)
Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
    camel/trunk/components/camel-netty/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu May  6 11:13:41 2010
@@ -105,7 +105,12 @@ public class MinaConsumer extends Defaul
             if (endpoint.getConfiguration().getCharsetName() != null) {
                 exchange.setProperty(Exchange.CHARSET_NAME, endpoint.getConfiguration().getCharsetName());
             }
-            getProcessor().process(exchange);
+
+            try {
+                getProcessor().process(exchange);
+            } catch (Throwable e) {
+                getExceptionHandler().handleException(e);
+            }
 
             // if sync then we should return a response
             if (sync) {

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java Thu May  6 11:13:41 2010
@@ -39,7 +39,7 @@ public final class MinaHelper {
      *
      * @param session  the MINA session
      * @param body     the body to write (send)
-     * @param exchange the mina exchange used for error reporting
+     * @param exchange the exchange
      * @throws CamelExchangeException is thrown if the body could not be written for some reasons
      *                                (eg remote connection is closed etc.)
      */
@@ -48,6 +48,9 @@ public final class MinaHelper {
         WriteFuture future = session.write(body);
         // must use a timeout (we use 10s) as in some very high performance scenarios a write can cause 
         // thread hanging forever
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for write to complete");
+        }
         future.join(10 * 1000L);
         if (!future.isWritten()) {
             LOG.warn("Cannot write body: " + body + " using session: " + session);

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May  6 11:13:41 2010
@@ -57,6 +57,7 @@ public class NettyConfiguration {
     private int maxPoolSize;
     private String keyStoreFormat;
     private String securityProvider;
+    private boolean disconnect;
 
     public NettyConfiguration() {
         setKeepAlive(true);
@@ -139,6 +140,9 @@ public class NettyConfiguration {
         if (settings.containsKey("maxPoolSize")) {
             setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
         }
+        if (settings.containsKey("disconnect")) {
+            setDisconnect(Boolean.valueOf((String) settings.get("disconnect")));
+        }
     }
 
     public String getProtocol() {
@@ -221,7 +225,6 @@ public class NettyConfiguration {
         this.sslHandler = sslHandler;
     }
 
-
     public List<ChannelDownstreamHandler> getEncoders() {
         return encoders;
     }
@@ -354,6 +357,18 @@ public class NettyConfiguration {
         this.securityProvider = securityProvider;
     }
 
+    public boolean isDisconnect() {
+        return disconnect;
+    }
+
+    public void setDisconnect(boolean disconnect) {
+        this.disconnect = disconnect;
+    }
+
+    public String getAddress() {
+        return host + ":" + port;
+    }
+
     private <T> void addToHandlersList(List configured, List handlers, Class<? extends T> handlerType) {
         if (handlers != null) {
             for (int x = 0; x < handlers.size(); x++) {

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May  6 11:13:41 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+/**
+ * Netty constants
+ *
+ * @version $Revision$
+ */
+public final class NettyConstants {
+
+    public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
+    public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
+    public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+
+    private NettyConstants() {
+        // Utility class
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May  6 11:13:41 2010
@@ -20,12 +20,14 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -39,12 +41,17 @@ public class NettyConsumer extends Defau
     private DatagramChannelFactory datagramChannelFactory;
     private ServerBootstrap serverBootstrap;
     private ConnectionlessBootstrap connectionlessServerBootstrap;
-    
-    public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor,
-        NettyConfiguration configuration) {
+    private Channel channel;
+
+    public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
         super(nettyEndpoint, processor);
-        this.configuration = nettyEndpoint.getConfiguration();
         this.context = this.getEndpoint().getCamelContext();
+        this.configuration = configuration;
+    }
+
+    @Override
+    public NettyEndpoint getEndpoint() {
+        return (NettyEndpoint) super.getEndpoint();
     }
 
     @Override
@@ -55,45 +62,28 @@ public class NettyConsumer extends Defau
         } else {
             initializeTCPServerSocketCommunicationLayer();
         }
+
+        LOG.info("Netty consumer bound to: " + configuration.getAddress());
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop(); 
-    }
-    
-    private void initializeTCPServerSocketCommunicationLayer() throws Exception {
-        ExecutorService bossExecutor = 
-            context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-        ExecutorService workerExecutor = 
-            context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-        channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
-        serverBootstrap = new ServerBootstrap(channelFactory);
-        
-        serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
-        serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
-        serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
-        serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
-        serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));        
-        LOG.info("Netty TCP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
-    }
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
+        }
 
-    private void initializeUDPServerSocketCommunicationLayer() throws Exception {
-        ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-        datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);        
-        connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
-        
-        connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
-        connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
-        connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
-        connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-        connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
-        connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
-        connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
-        connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
-        connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        LOG.info("Netty UDP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
+
+        if (channel != null) {
+            NettyHelper.close(channel);
+        }
+
+        // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping
+        // and then releasing channel factory would be faster
+//        if (channelFactory != null) {
+//            channelFactory.releaseExternalResources();
+//        }
+
+        super.doStop();
     }
     
     public NettyConfiguration getConfiguration() {
@@ -112,13 +102,11 @@ public class NettyConsumer extends Defau
         this.channelFactory = channelFactory;
     }
 
-
     public DatagramChannelFactory getDatagramChannelFactory() {
         return datagramChannelFactory;
     }
 
-    public void setDatagramChannelFactory(
-        DatagramChannelFactory datagramChannelFactory) {
+    public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) {
         this.datagramChannelFactory = datagramChannelFactory;
     } 
     
@@ -134,9 +122,43 @@ public class NettyConsumer extends Defau
         return connectionlessServerBootstrap;
     }
 
-    public void setConnectionlessServerBootstrap(
-            ConnectionlessBootstrap connectionlessServerBootstrap) {
+    public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) {
         this.connectionlessServerBootstrap = connectionlessServerBootstrap;
     } 
     
+    private void initializeTCPServerSocketCommunicationLayer() throws Exception {
+        ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
+                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+        ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker",
+                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+
+        channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+        serverBootstrap = new ServerBootstrap(channelFactory);
+        serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+        serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+
+        channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+    }
+
+    private void initializeUDPServerSocketCommunicationLayer() throws Exception {
+        ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker",
+                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+
+        datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+        connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+        connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+        connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+        connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+        connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+
+        channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+    }
+
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May  6 11:13:41 2010
@@ -43,13 +43,13 @@ public class NettyEndpoint extends Defau
 
     public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) {
         Exchange exchange = createExchange();
-        exchange.getIn().setHeader("NettyChannelHandlerContext", ctx);
-        exchange.getIn().setHeader("NettyMessageEvent", messageEvent);
+        exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
+        exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
         return exchange;        
     }
     
     public boolean isSingleton() {
-        return false;
+        return true;
     }
 
     public NettyConfiguration getConfiguration() {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Thu May  6 11:13:41 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty;
 
+import java.net.SocketAddress;
+
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.commons.logging.Log;
@@ -45,17 +47,34 @@ public final class NettyHelper {
      * @throws CamelExchangeException is thrown if the body could not be written for some reasons
      *                                (eg remote connection is closed etc.)
      */
-    public static void writeBody(Channel channel, Object body, Exchange exchange) throws CamelExchangeException {
+    public static void writeBody(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
         // the write operation is asynchronous. Use future to wait until the session has been written
-        ChannelFuture future = channel.write(body);
+        ChannelFuture future;
+        if (remoteAddress != null) {
+            future = channel.write(body, remoteAddress);
+        } else {
+            future = channel.write(body);
+        }
 
         // wait for the write
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for write to complete");
+        }
         future.awaitUninterruptibly();
 
         // if it was not a success then thrown an exception
         if (future.isSuccess() == false) {
             LOG.warn("Cannot write body: " + body + " using channel: " + channel);
-            throw new CamelExchangeException("Cannot write body", exchange);
+            throw new CamelExchangeException("Cannot write body", exchange, future.getCause());
+        }
+    }
+
+    public static void close(Channel channel) {
+        if (channel != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Closing channel: " + channel);
+            }
+            channel.close().awaitUninterruptibly();
         }
     }
 

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May  6 11:13:41 2010
@@ -22,11 +22,13 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -56,7 +58,19 @@ public class NettyProducer extends Defau
         super(nettyEndpoint);
         this.configuration = configuration;
         this.context = this.getEndpoint().getCamelContext();
-    } 
+    }
+
+    @Override
+    public NettyEndpoint getEndpoint() {
+        return (NettyEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean isSingleton() {
+        // the producer should not be singleton otherwise cannot use concurrent producers and safely
+        // use request/reply with correct correlation
+        return false;
+    }
 
     @Override
     protected void doStart() throws Exception {
@@ -70,14 +84,17 @@ public class NettyProducer extends Defau
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-    }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping producer at address: " + configuration.getAddress());
+        }
+        if (channelFuture != null) {
+            NettyHelper.close(channelFuture.getChannel());
+        }
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+        }
 
-    @Override
-    public boolean isSingleton() {
-        // the producer should not be singleton otherwise cannot use concurrent producers and safely
-        // use request/reply with correct correlation
-        return false;
+        super.doStop();
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -87,7 +104,7 @@ public class NettyProducer extends Defau
 
         // write the body
         Channel channel = channelFuture.getChannel();
-        NettyHelper.writeBody(channel, exchange.getIn().getBody(), exchange);
+        NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
 
         if (configuration.isSync()) {
             boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
@@ -96,21 +113,35 @@ public class NettyProducer extends Defau
             }
             Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse();
             exchange.getOut().setBody(response);
-        }                 
+        }
+
+        // should channel be closed after complete?
+        Boolean close;
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+        } else {
+            close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+        }
+
+        // should we disconnect, the header can override the configuration
+        boolean disconnect = getConfiguration().isDisconnect();
+        if (close != null) {
+            disconnect = close;
+        }
+        if (disconnect) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
+            }
+            NettyHelper.close(channel);
+        }
     }
 
     protected void setupTCPCommunication() throws Exception {
         if (channelFactory == null) {
-            ExecutorService bossExecutor = 
-                context.getExecutorServiceStrategy().newThreadPool(this, 
-                    "NettyTCPBoss", 
-                    configuration.getCorePoolSize(), 
-                    configuration.getMaxPoolSize());
-            ExecutorService workerExecutor = 
-                context.getExecutorServiceStrategy().newThreadPool(this, 
-                    "NettyTCPWorker", 
-                    configuration.getCorePoolSize(), 
-                    configuration.getMaxPoolSize());
+            ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
+                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+            ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker",
+                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
             channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
         }
         if (clientBootstrap == null) {
@@ -125,18 +156,20 @@ public class NettyProducer extends Defau
             clientPipeline = clientPipelineFactory.getPipeline();
             clientBootstrap.setPipeline(clientPipeline);
         }
-        channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); 
+
+        channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         channelFuture.awaitUninterruptibly();
-        LOG.info("Netty TCP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
+        if (!channelFuture.isSuccess()) {
+            throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+        }
+
+        LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress());
     }
-    
+
     protected void setupUDPCommunication() throws Exception {
         if (datagramChannelFactory == null) {
-            ExecutorService workerExecutor = 
-                context.getExecutorServiceStrategy().newThreadPool(this, 
-                    "NettyUDPWorker", 
-                    configuration.getCorePoolSize(), 
-                    configuration.getMaxPoolSize());
+            ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker",
+                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
             datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
         }
         if (connectionlessClientBootstrap == null) {
@@ -155,12 +188,17 @@ public class NettyProducer extends Defau
             clientPipeline = clientPipelineFactory.getPipeline();
             connectionlessClientBootstrap.setPipeline(clientPipeline);
         }
+
         connectionlessClientBootstrap.bind(new InetSocketAddress(0));
-        channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); 
+        channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         channelFuture.awaitUninterruptibly();
-        LOG.info("Netty UDP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
-    }    
-    
+        if (!channelFuture.isSuccess()) {
+            throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+        }
+
+        LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress());
+    }
+
     public NettyConfiguration getConfiguration() {
         return configuration;
     }
@@ -216,5 +254,5 @@ public class NettyProducer extends Defau
     public void setClientPipeline(ChannelPipeline clientPipeline) {
         this.clientPipeline = clientPipeline;
     }
-    
+
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May  6 11:13:41 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty.handlers;
 
+import org.apache.camel.CamelException;
+import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,20 +39,25 @@ public class ClientChannelHandler extend
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
-        throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("An exception was caught by the ClientChannelHandler during communication", exceptionEvent.getCause());
+            LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
         }
+        // close channel in case an exception was thrown
+        NettyHelper.close(exceptionEvent.getChannel());
+
+        // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+        throw new CamelException(exceptionEvent.getCause());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
-        throws Exception {
-        response = messageEvent.getMessage();
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+        setResponse(messageEvent.getMessage());
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Incoming message:" + response);
         }
+
         if (producer.getConfiguration().isSync()) {
             producer.getCountdownLatch().countDown();
         }        

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May  6 11:13:41 2010
@@ -16,17 +16,15 @@
  */
 package org.apache.camel.component.netty.handlers;
 
-import java.net.InetSocketAddress;
-
-import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyConsumer;
-import org.apache.camel.component.netty.NettyEndpoint;
+import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -44,37 +42,55 @@ public class ServerChannelHandler extend
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
-        throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("An exception was caught by the ServerChannelHandler during communication", exceptionEvent.getCause());
+            LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
         }
+        // close channel in case an exception was thrown
+        NettyHelper.close(exceptionEvent.getChannel());
+
+        // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+        throw new CamelException(exceptionEvent.getCause());
     }
     
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
-        throws Exception {
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
         Object in = messageEvent.getMessage();
         if (LOG.isDebugEnabled()) {
             if (in instanceof byte[]) {
+                // byte arrays is not readable so convert to string
                 in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
             }
             LOG.debug("Incoming message: " + in);
         }
         
-        // Dispatch exchange along the route and receive the final resulting exchange
-        dispatchExchange(ctx, messageEvent, in); 
+        // create Exchange and let the consumer process it
+        Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+        if (consumer.getConfiguration().isSync()) {
+            exchange.setPattern(ExchangePattern.InOut);
+        }
+        exchange.getIn().setBody(in);
+
+        try {
+            consumer.getProcessor().process(exchange);
+        } catch (Throwable e) {
+            consumer.getExceptionHandler().handleException(e);
+        }
+
+        // send back response if the communication is synchronous
+        if (consumer.getConfiguration().isSync()) {
+            sendResponse(messageEvent, exchange);
+        }
     }
 
-    private void sendResponsetoChannel(MessageEvent messageEvent, Exchange exchange) throws Exception {
-        ChannelFuture future;
+    private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {
         Object body;
         if (ExchangeHelper.isOutCapable(exchange)) {
             body = exchange.getOut().getBody();
         } else {
             body = exchange.getIn().getBody();
         }
-        
+
         if (exchange.isFailed()) {
             if (exchange.getException() == null) {
                 // fault detected
@@ -83,54 +99,43 @@ public class ServerChannelHandler extend
                 body = exchange.getException();
             }
         }
-        
+
         if (body == null) {
-            LOG.warn("No Oubound Response received following route completion: " + exchange);
-            LOG.warn("A response cannot be sent to the Client");
-            messageEvent.getChannel().close();
-        }
-        
-        if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
-            future = messageEvent.getChannel().write(body, messageEvent.getRemoteAddress());
+            // must close session if no data to write otherwise client will never receive a response
+            // and wait forever (if not timing out)
+            LOG.warn("Cannot write body since its null, closing channel: " + exchange);
+            NettyHelper.close(messageEvent.getChannel());
         } else {
-            future = messageEvent.getChannel().write(body);
-        }
-        
-        if (!future.isSuccess()) {
-            String hostname = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getHostName();
-            int port = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getPort();
-            throw new CamelExchangeException("Could not send response via Channel to remote host " + hostname + " and port " + port, exchange);
-        }
-        
-        if (LOG.isDebugEnabled()) {
-            if (body instanceof byte[]) {
-                body = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body);
+            // we got a body to write
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Writing body" + body);
+            }
+            if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
+                NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
+            } else {
+                NettyHelper.writeBody(messageEvent.getChannel(), null, body, exchange);
             }
-            LOG.debug("Sent Outgoing message: " + body);
-        }        
-    }
-
-    private void dispatchExchange(ChannelHandlerContext ctx, MessageEvent messageEvent, Object in) throws Exception {        
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Consumer Dispatching the Incoming exchange along the route");
         }
 
-        Exchange exchange = ((NettyEndpoint)consumer.getEndpoint()).createExchange(ctx, messageEvent);
-        if (consumer.getConfiguration().isSync()) {
-            exchange.setPattern(ExchangePattern.InOut);
+        // should channel be closed after complete?
+        Boolean close;
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+        } else {
+            close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
         }
-        exchange.getIn().setBody(in);
-        
-        try {
-            consumer.getProcessor().process(exchange);
-        } catch (Exception exception) {
-            throw new CamelExchangeException("Error in consumer while dispatching exchange for further processing", exchange);
+
+        // should we disconnect, the header can override the configuration
+        boolean disconnect = consumer.getConfiguration().isDisconnect();
+        if (close != null) {
+            disconnect = close;
         }
-        
-        // Send back response if the communication is synchronous
-        if (consumer.getConfiguration().isSync()) {
-            sendResponsetoChannel(messageEvent, exchange);
+        if (disconnect) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Closing channel when complete at address: " + messageEvent.getRemoteAddress());
+            }
+            NettyHelper.close(messageEvent.getChannel());
         }
     }
-    
+
 }

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java Thu May  6 11:13:41 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyDisconnectTest extends CamelTestSupport {
+
+    private String uri = "netty:tcp://localhost:8080?sync=true&disconnect=true";
+
+    @Test
+    public void testCloseSessionWhenComplete() throws Exception {
+        Object out = template.requestBody(uri, "Claus");
+        assertEquals("Bye Claus", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(uri).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java Thu May  6 11:13:41 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyInOutCloseChannelWhenCompleteTest extends CamelTestSupport {
+
+    @Test
+    public void testCloseSessionWhenComplete() throws Exception {
+        Object out = template.requestBody("netty:tcp://localhost:8080?sync=true", "Claus");
+        assertEquals("Bye Claus", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:8080?sync=true").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                        exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Thu May  6 11:13:41 2010
@@ -18,12 +18,12 @@
 #
 # The logging properties used for eclipse testing, We want to see debug output on the console.
 #
-log4j.rootLogger=DEBUG, file
+log4j.rootLogger=INFO, file
 
 # uncomment the following to enable camel debugging
 log4j.logger.org.apache.camel.component.netty=DEBUG
-log4j.logger.org.apache.camel=DEBUG
-log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.commons.net=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender