You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 14:40:14 UTC
svn commit: r941699 - in
/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty:
NettyConsumer.java NettyProducer.java handlers/ClientChannelHandler.java
handlers/ServerChannelHandler.java
Author: davsclaus
Date: Thu May 6 12:40:14 2010
New Revision: 941699
URL: http://svn.apache.org/viewvc?rev=941699&view=rev
Log:
CAMEL-2699: camel-netty now shutdown properly.
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 12:40:14 2010
@@ -20,7 +20,6 @@ import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.commons.logging.Log;
@@ -29,12 +28,16 @@ import org.jboss.netty.bootstrap.Connect
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class NettyConsumer extends DefaultConsumer {
private static final transient Log LOG = LogFactory.getLog(NettyConsumer.class);
+ private final ChannelGroup allChannels;
private CamelContext context;
private NettyConfiguration configuration;
private ChannelFactory channelFactory;
@@ -47,6 +50,7 @@ public class NettyConsumer extends Defau
super(nettyEndpoint, processor);
this.context = this.getEndpoint().getCamelContext();
this.configuration = configuration;
+ this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
}
@Override
@@ -72,20 +76,22 @@ public class NettyConsumer extends Defau
LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
}
-
- if (channel != null) {
- NettyHelper.close(channel);
+ // close all channels
+ ChannelGroupFuture future = allChannels.close();
+ future.awaitUninterruptibly();
+
+ // and then release other resources
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
}
- // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping
- // and then releasing channel factory would be faster
-// if (channelFactory != null) {
-// channelFactory.releaseExternalResources();
-// }
-
super.doStop();
}
-
+
+ public ChannelGroup getAllChannels() {
+ return allChannels;
+ }
+
public NettyConfiguration getConfiguration() {
return configuration;
}
@@ -108,8 +114,8 @@ public class NettyConsumer extends Defau
public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) {
this.datagramChannelFactory = datagramChannelFactory;
- }
-
+ }
+
public ServerBootstrap getServerBootstrap() {
return serverBootstrap;
}
@@ -124,8 +130,8 @@ public class NettyConsumer extends Defau
public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) {
this.connectionlessServerBootstrap = connectionlessServerBootstrap;
- }
-
+ }
+
private void initializeTCPServerSocketCommunicationLayer() throws Exception {
ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
@@ -141,6 +147,8 @@ public class NettyConsumer extends Defau
serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ // to keep track of all channels in use
+ allChannels.add(channel);
}
private void initializeUDPServerSocketCommunicationLayer() throws Exception {
@@ -159,6 +167,8 @@ public class NettyConsumer extends Defau
connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ // to keep track of all channels in use
+ allChannels.add(channel);
}
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 12:40:14 2010
@@ -37,18 +37,23 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
public class NettyProducer extends DefaultProducer implements ServicePoolAware {
private static final transient Log LOG = LogFactory.getLog(NettyProducer.class);
+ private final ChannelGroup allChannels;
private CamelContext context;
private NettyConfiguration configuration;
private CountDownLatch countdownLatch;
private ChannelFactory channelFactory;
private DatagramChannelFactory datagramChannelFactory;
private ChannelFuture channelFuture;
+ private Channel channel;
private ClientBootstrap clientBootstrap;
private ConnectionlessBootstrap connectionlessClientBootstrap;
private ClientPipelineFactory clientPipelineFactory;
@@ -58,6 +63,7 @@ public class NettyProducer extends Defau
super(nettyEndpoint);
this.configuration = configuration;
this.context = this.getEndpoint().getCamelContext();
+ this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
}
@Override
@@ -87,9 +93,12 @@ public class NettyProducer extends Defau
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping producer at address: " + configuration.getAddress());
}
- if (channelFuture != null) {
- NettyHelper.close(channelFuture.getChannel());
- }
+
+ // close all channels
+ ChannelGroupFuture future = allChannels.close();
+ future.awaitUninterruptibly();
+
+ // and then release other resources
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
@@ -103,7 +112,6 @@ public class NettyProducer extends Defau
}
// write the body
- Channel channel = channelFuture.getChannel();
NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
if (configuration.isSync()) {
@@ -162,6 +170,9 @@ public class NettyProducer extends Defau
if (!channelFuture.isSuccess()) {
throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
}
+ channel = channelFuture.getChannel();
+ // to keep track of all channels in use
+ allChannels.add(channel);
LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress());
}
@@ -195,6 +206,9 @@ public class NettyProducer extends Defau
if (!channelFuture.isSuccess()) {
throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
}
+ channel = channelFuture.getChannel();
+ // to keep track of all channels in use
+ allChannels.add(channel);
LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress());
}
@@ -255,4 +269,7 @@ public class NettyProducer extends Defau
this.clientPipeline = clientPipeline;
}
+ public ChannelGroup getAllChannels() {
+ return allChannels;
+ }
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 12:40:14 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -39,6 +40,12 @@ public class ClientChannelHandler extend
}
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+ // to keep track of open sockets
+ producer.getAllChannels().add(channelStateEvent.getChannel());
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 12:40:14 2010
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -42,6 +43,12 @@ public class ServerChannelHandler extend
}
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+ // to keep track of open sockets
+ consumer.getAllChannels().add(channelStateEvent.getChannel());
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
@@ -108,7 +115,7 @@ public class ServerChannelHandler extend
} else {
// we got a body to write
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing body" + body);
+ LOG.debug("Writing body: " + body);
}
if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);