You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 14:40:14 UTC

svn commit: r941699 - in /camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty: NettyConsumer.java NettyProducer.java handlers/ClientChannelHandler.java handlers/ServerChannelHandler.java

Author: davsclaus
Date: Thu May  6 12:40:14 2010
New Revision: 941699

URL: http://svn.apache.org/viewvc?rev=941699&view=rev
Log:
CAMEL-2699: camel-netty now shutdown properly.

Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May  6 12:40:14 2010
@@ -20,7 +20,6 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.commons.logging.Log;
@@ -29,12 +28,16 @@ import org.jboss.netty.bootstrap.Connect
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 public class NettyConsumer extends DefaultConsumer {
     private static final transient Log LOG = LogFactory.getLog(NettyConsumer.class);
+    private final ChannelGroup allChannels;
     private CamelContext context;
     private NettyConfiguration configuration;
     private ChannelFactory channelFactory;
@@ -47,6 +50,7 @@ public class NettyConsumer extends Defau
         super(nettyEndpoint, processor);
         this.context = this.getEndpoint().getCamelContext();
         this.configuration = configuration;
+        this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
     }
 
     @Override
@@ -72,20 +76,22 @@ public class NettyConsumer extends Defau
             LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
         }
 
-
-        if (channel != null) {
-            NettyHelper.close(channel);
+        // close all channels
+        ChannelGroupFuture future = allChannels.close();
+        future.awaitUninterruptibly();
+
+        // and then release other resources
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
         }
 
-        // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping
-        // and then releasing channel factory would be faster
-//        if (channelFactory != null) {
-//            channelFactory.releaseExternalResources();
-//        }
-
         super.doStop();
     }
-    
+
+    public ChannelGroup getAllChannels() {
+        return allChannels;
+    }
+
     public NettyConfiguration getConfiguration() {
         return configuration;
     }
@@ -108,8 +114,8 @@ public class NettyConsumer extends Defau
 
     public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) {
         this.datagramChannelFactory = datagramChannelFactory;
-    } 
-    
+    }
+
     public ServerBootstrap getServerBootstrap() {
         return serverBootstrap;
     }
@@ -124,8 +130,8 @@ public class NettyConsumer extends Defau
 
     public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) {
         this.connectionlessServerBootstrap = connectionlessServerBootstrap;
-    } 
-    
+    }
+
     private void initializeTCPServerSocketCommunicationLayer() throws Exception {
         ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
                 configuration.getCorePoolSize(), configuration.getMaxPoolSize());
@@ -141,6 +147,8 @@ public class NettyConsumer extends Defau
         serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
 
         channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        // to keep track of all channels in use
+        allChannels.add(channel);
     }
 
     private void initializeUDPServerSocketCommunicationLayer() throws Exception {
@@ -159,6 +167,8 @@ public class NettyConsumer extends Defau
         connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
 
         channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        // to keep track of all channels in use
+        allChannels.add(channel);
     }
 
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May  6 12:40:14 2010
@@ -37,18 +37,23 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
 public class NettyProducer extends DefaultProducer implements ServicePoolAware {
     private static final transient Log LOG = LogFactory.getLog(NettyProducer.class);
+    private final ChannelGroup allChannels;
     private CamelContext context;
     private NettyConfiguration configuration;
     private CountDownLatch countdownLatch;
     private ChannelFactory channelFactory;
     private DatagramChannelFactory datagramChannelFactory;
     private ChannelFuture channelFuture;
+    private Channel channel;
     private ClientBootstrap clientBootstrap;
     private ConnectionlessBootstrap connectionlessClientBootstrap;
     private ClientPipelineFactory clientPipelineFactory;
@@ -58,6 +63,7 @@ public class NettyProducer extends Defau
         super(nettyEndpoint);
         this.configuration = configuration;
         this.context = this.getEndpoint().getCamelContext();
+        this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
     }
 
     @Override
@@ -87,9 +93,12 @@ public class NettyProducer extends Defau
         if (LOG.isDebugEnabled()) {
             LOG.debug("Stopping producer at address: " + configuration.getAddress());
         }
-        if (channelFuture != null) {
-            NettyHelper.close(channelFuture.getChannel());
-        }
+
+        // close all channels
+        ChannelGroupFuture future = allChannels.close();
+        future.awaitUninterruptibly();
+
+        // and then release other resources
         if (channelFactory != null) {
             channelFactory.releaseExternalResources();
         }
@@ -103,7 +112,6 @@ public class NettyProducer extends Defau
         }
 
         // write the body
-        Channel channel = channelFuture.getChannel();
         NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
 
         if (configuration.isSync()) {
@@ -162,6 +170,9 @@ public class NettyProducer extends Defau
         if (!channelFuture.isSuccess()) {
             throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
         }
+        channel = channelFuture.getChannel();
+        // to keep track of all channels in use
+        allChannels.add(channel);
 
         LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress());
     }
@@ -195,6 +206,9 @@ public class NettyProducer extends Defau
         if (!channelFuture.isSuccess()) {
             throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
         }
+        channel = channelFuture.getChannel();
+        // to keep track of all channels in use
+        allChannels.add(channel);
 
         LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress());
     }
@@ -255,4 +269,7 @@ public class NettyProducer extends Defau
         this.clientPipeline = clientPipeline;
     }
 
+    public ChannelGroup getAllChannels() {
+        return allChannels;
+    }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May  6 12:40:14 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -39,6 +40,12 @@ public class ClientChannelHandler extend
     }
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+        // to keep track of open sockets
+        producer.getAllChannels().add(channelStateEvent.getChannel());
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May  6 12:40:14 2010
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -42,6 +43,12 @@ public class ServerChannelHandler extend
     }
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+        // to keep track of open sockets
+        consumer.getAllChannels().add(channelStateEvent.getChannel());
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
@@ -108,7 +115,7 @@ public class ServerChannelHandler extend
         } else {
             // we got a body to write
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing body" + body);
+                LOG.debug("Writing body: " + body);
             }
             if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
                 NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);