You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 13:13:42 UTC
svn commit: r941661 - in /camel/trunk/components:
camel-mina/src/main/java/org/apache/camel/component/mina/
camel-netty/src/main/java/org/apache/camel/component/netty/
camel-netty/src/main/java/org/apache/camel/component/netty/handlers/
camel-netty/src...
Author: davsclaus
Date: Thu May 6 11:13:41 2010
New Revision: 941661
URL: http://svn.apache.org/viewvc?rev=941661&view=rev
Log:
CAMEL-2699: Improve camel-netty to properly shutdown. Also add features which we have in camel-mina but wasnt ported to camel-netty yet.
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java (with props)
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
camel/trunk/components/camel-netty/src/test/resources/log4j.properties
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu May 6 11:13:41 2010
@@ -105,7 +105,12 @@ public class MinaConsumer extends Defaul
if (endpoint.getConfiguration().getCharsetName() != null) {
exchange.setProperty(Exchange.CHARSET_NAME, endpoint.getConfiguration().getCharsetName());
}
- getProcessor().process(exchange);
+
+ try {
+ getProcessor().process(exchange);
+ } catch (Throwable e) {
+ getExceptionHandler().handleException(e);
+ }
// if sync then we should return a response
if (sync) {
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java Thu May 6 11:13:41 2010
@@ -39,7 +39,7 @@ public final class MinaHelper {
*
* @param session the MINA session
* @param body the body to write (send)
- * @param exchange the mina exchange used for error reporting
+ * @param exchange the exchange
* @throws CamelExchangeException is thrown if the body could not be written for some reasons
* (eg remote connection is closed etc.)
*/
@@ -48,6 +48,9 @@ public final class MinaHelper {
WriteFuture future = session.write(body);
// must use a timeout (we use 10s) as in some very high performance scenarios a write can cause
// thread hanging forever
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for write to complete");
+ }
future.join(10 * 1000L);
if (!future.isWritten()) {
LOG.warn("Cannot write body: " + body + " using session: " + session);
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 11:13:41 2010
@@ -57,6 +57,7 @@ public class NettyConfiguration {
private int maxPoolSize;
private String keyStoreFormat;
private String securityProvider;
+ private boolean disconnect;
public NettyConfiguration() {
setKeepAlive(true);
@@ -139,6 +140,9 @@ public class NettyConfiguration {
if (settings.containsKey("maxPoolSize")) {
setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
}
+ if (settings.containsKey("disconnect")) {
+ setDisconnect(Boolean.valueOf((String) settings.get("disconnect")));
+ }
}
public String getProtocol() {
@@ -221,7 +225,6 @@ public class NettyConfiguration {
this.sslHandler = sslHandler;
}
-
public List<ChannelDownstreamHandler> getEncoders() {
return encoders;
}
@@ -354,6 +357,18 @@ public class NettyConfiguration {
this.securityProvider = securityProvider;
}
+ public boolean isDisconnect() {
+ return disconnect;
+ }
+
+ public void setDisconnect(boolean disconnect) {
+ this.disconnect = disconnect;
+ }
+
+ public String getAddress() {
+ return host + ":" + port;
+ }
+
private <T> void addToHandlersList(List configured, List handlers, Class<? extends T> handlerType) {
if (handlers != null) {
for (int x = 0; x < handlers.size(); x++) {
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May 6 11:13:41 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+/**
+ * Netty constants
+ *
+ * @version $Revision$
+ */
+public final class NettyConstants {
+
+ public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
+ public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
+ public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+
+ private NettyConstants() {
+ // Utility class
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 11:13:41 2010
@@ -20,12 +20,14 @@ import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -39,12 +41,17 @@ public class NettyConsumer extends Defau
private DatagramChannelFactory datagramChannelFactory;
private ServerBootstrap serverBootstrap;
private ConnectionlessBootstrap connectionlessServerBootstrap;
-
- public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor,
- NettyConfiguration configuration) {
+ private Channel channel;
+
+ public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
super(nettyEndpoint, processor);
- this.configuration = nettyEndpoint.getConfiguration();
this.context = this.getEndpoint().getCamelContext();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public NettyEndpoint getEndpoint() {
+ return (NettyEndpoint) super.getEndpoint();
}
@Override
@@ -55,45 +62,28 @@ public class NettyConsumer extends Defau
} else {
initializeTCPServerSocketCommunicationLayer();
}
+
+ LOG.info("Netty consumer bound to: " + configuration.getAddress());
}
@Override
protected void doStop() throws Exception {
- super.doStop();
- }
-
- private void initializeTCPServerSocketCommunicationLayer() throws Exception {
- ExecutorService bossExecutor =
- context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
- context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
- channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
- serverBootstrap = new ServerBootstrap(channelFactory);
-
- serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
- serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
- serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- LOG.info("Netty TCP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
+ }
- private void initializeUDPServerSocketCommunicationLayer() throws Exception {
- ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
- datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
- connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
-
- connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
- connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
- connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
- connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
- connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
- connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- LOG.info("Netty UDP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
+
+ if (channel != null) {
+ NettyHelper.close(channel);
+ }
+
+ // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping
+ // and then releasing channel factory would be faster
+// if (channelFactory != null) {
+// channelFactory.releaseExternalResources();
+// }
+
+ super.doStop();
}
public NettyConfiguration getConfiguration() {
@@ -112,13 +102,11 @@ public class NettyConsumer extends Defau
this.channelFactory = channelFactory;
}
-
public DatagramChannelFactory getDatagramChannelFactory() {
return datagramChannelFactory;
}
- public void setDatagramChannelFactory(
- DatagramChannelFactory datagramChannelFactory) {
+ public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) {
this.datagramChannelFactory = datagramChannelFactory;
}
@@ -134,9 +122,43 @@ public class NettyConsumer extends Defau
return connectionlessServerBootstrap;
}
- public void setConnectionlessServerBootstrap(
- ConnectionlessBootstrap connectionlessServerBootstrap) {
+ public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) {
this.connectionlessServerBootstrap = connectionlessServerBootstrap;
}
+ private void initializeTCPServerSocketCommunicationLayer() throws Exception {
+ ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+ ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+
+ channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+ serverBootstrap = new ServerBootstrap(channelFactory);
+ serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+ serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+
+ channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ }
+
+ private void initializeUDPServerSocketCommunicationLayer() throws Exception {
+ ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+
+ datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+ connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+ connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+ connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+ connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+ connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+ connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+
+ channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ }
+
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May 6 11:13:41 2010
@@ -43,13 +43,13 @@ public class NettyEndpoint extends Defau
public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) {
Exchange exchange = createExchange();
- exchange.getIn().setHeader("NettyChannelHandlerContext", ctx);
- exchange.getIn().setHeader("NettyMessageEvent", messageEvent);
+ exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
+ exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
return exchange;
}
public boolean isSingleton() {
- return false;
+ return true;
}
public NettyConfiguration getConfiguration() {
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Thu May 6 11:13:41 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.netty;
+import java.net.SocketAddress;
+
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.commons.logging.Log;
@@ -45,17 +47,34 @@ public final class NettyHelper {
* @throws CamelExchangeException is thrown if the body could not be written for some reasons
* (eg remote connection is closed etc.)
*/
- public static void writeBody(Channel channel, Object body, Exchange exchange) throws CamelExchangeException {
+ public static void writeBody(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
// the write operation is asynchronous. Use future to wait until the session has been written
- ChannelFuture future = channel.write(body);
+ ChannelFuture future;
+ if (remoteAddress != null) {
+ future = channel.write(body, remoteAddress);
+ } else {
+ future = channel.write(body);
+ }
// wait for the write
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for write to complete");
+ }
future.awaitUninterruptibly();
// if it was not a success then thrown an exception
if (future.isSuccess() == false) {
LOG.warn("Cannot write body: " + body + " using channel: " + channel);
- throw new CamelExchangeException("Cannot write body", exchange);
+ throw new CamelExchangeException("Cannot write body", exchange, future.getCause());
+ }
+ }
+
+ public static void close(Channel channel) {
+ if (channel != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing channel: " + channel);
+ }
+ channel.close().awaitUninterruptibly();
}
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 11:13:41 2010
@@ -22,11 +22,13 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -56,7 +58,19 @@ public class NettyProducer extends Defau
super(nettyEndpoint);
this.configuration = configuration;
this.context = this.getEndpoint().getCamelContext();
- }
+ }
+
+ @Override
+ public NettyEndpoint getEndpoint() {
+ return (NettyEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ // the producer should not be singleton otherwise cannot use concurrent producers and safely
+ // use request/reply with correct correlation
+ return false;
+ }
@Override
protected void doStart() throws Exception {
@@ -70,14 +84,17 @@ public class NettyProducer extends Defau
@Override
protected void doStop() throws Exception {
- super.doStop();
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping producer at address: " + configuration.getAddress());
+ }
+ if (channelFuture != null) {
+ NettyHelper.close(channelFuture.getChannel());
+ }
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ }
- @Override
- public boolean isSingleton() {
- // the producer should not be singleton otherwise cannot use concurrent producers and safely
- // use request/reply with correct correlation
- return false;
+ super.doStop();
}
public void process(Exchange exchange) throws Exception {
@@ -87,7 +104,7 @@ public class NettyProducer extends Defau
// write the body
Channel channel = channelFuture.getChannel();
- NettyHelper.writeBody(channel, exchange.getIn().getBody(), exchange);
+ NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
if (configuration.isSync()) {
boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
@@ -96,21 +113,35 @@ public class NettyProducer extends Defau
}
Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse();
exchange.getOut().setBody(response);
- }
+ }
+
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
+ }
+ NettyHelper.close(channel);
+ }
}
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
- ExecutorService bossExecutor =
- context.getExecutorServiceStrategy().newThreadPool(this,
- "NettyTCPBoss",
- configuration.getCorePoolSize(),
- configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
- context.getExecutorServiceStrategy().newThreadPool(this,
- "NettyTCPWorker",
- configuration.getCorePoolSize(),
- configuration.getMaxPoolSize());
+ ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+ ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
}
if (clientBootstrap == null) {
@@ -125,18 +156,20 @@ public class NettyProducer extends Defau
clientPipeline = clientPipelineFactory.getPipeline();
clientBootstrap.setPipeline(clientPipeline);
}
- channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+
+ channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
channelFuture.awaitUninterruptibly();
- LOG.info("Netty TCP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
+ if (!channelFuture.isSuccess()) {
+ throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+ }
+
+ LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress());
}
-
+
protected void setupUDPCommunication() throws Exception {
if (datagramChannelFactory == null) {
- ExecutorService workerExecutor =
- context.getExecutorServiceStrategy().newThreadPool(this,
- "NettyUDPWorker",
- configuration.getCorePoolSize(),
- configuration.getMaxPoolSize());
+ ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker",
+ configuration.getCorePoolSize(), configuration.getMaxPoolSize());
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
}
if (connectionlessClientBootstrap == null) {
@@ -155,12 +188,17 @@ public class NettyProducer extends Defau
clientPipeline = clientPipelineFactory.getPipeline();
connectionlessClientBootstrap.setPipeline(clientPipeline);
}
+
connectionlessClientBootstrap.bind(new InetSocketAddress(0));
- channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
channelFuture.awaitUninterruptibly();
- LOG.info("Netty UDP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
- }
-
+ if (!channelFuture.isSuccess()) {
+ throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+ }
+
+ LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress());
+ }
+
public NettyConfiguration getConfiguration() {
return configuration;
}
@@ -216,5 +254,5 @@ public class NettyProducer extends Defau
public void setClientPipeline(ChannelPipeline clientPipeline) {
this.clientPipeline = clientPipeline;
}
-
+
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 11:13:41 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.netty.handlers;
+import org.apache.camel.CamelException;
+import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.NettyProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,20 +39,25 @@ public class ClientChannelHandler extend
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("An exception was caught by the ClientChannelHandler during communication", exceptionEvent.getCause());
+ LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
}
+ // close channel in case an exception was thrown
+ NettyHelper.close(exceptionEvent.getChannel());
+
+ // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+ throw new CamelException(exceptionEvent.getCause());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
- throws Exception {
- response = messageEvent.getMessage();
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+ setResponse(messageEvent.getMessage());
+
if (LOG.isDebugEnabled()) {
LOG.debug("Incoming message:" + response);
}
+
if (producer.getConfiguration().isSync()) {
producer.getCountdownLatch().countDown();
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 11:13:41 2010
@@ -16,17 +16,15 @@
*/
package org.apache.camel.component.netty.handlers;
-import java.net.InetSocketAddress;
-
-import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.component.netty.NettyConsumer;
-import org.apache.camel.component.netty.NettyEndpoint;
+import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ExceptionEvent;
@@ -44,37 +42,55 @@ public class ServerChannelHandler extend
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("An exception was caught by the ServerChannelHandler during communication", exceptionEvent.getCause());
+ LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
}
+ // close channel in case an exception was thrown
+ NettyHelper.close(exceptionEvent.getChannel());
+
+ // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+ throw new CamelException(exceptionEvent.getCause());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
- throws Exception {
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
Object in = messageEvent.getMessage();
if (LOG.isDebugEnabled()) {
if (in instanceof byte[]) {
+ // byte arrays is not readable so convert to string
in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
}
LOG.debug("Incoming message: " + in);
}
- // Dispatch exchange along the route and receive the final resulting exchange
- dispatchExchange(ctx, messageEvent, in);
+ // create Exchange and let the consumer process it
+ Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+ if (consumer.getConfiguration().isSync()) {
+ exchange.setPattern(ExchangePattern.InOut);
+ }
+ exchange.getIn().setBody(in);
+
+ try {
+ consumer.getProcessor().process(exchange);
+ } catch (Throwable e) {
+ consumer.getExceptionHandler().handleException(e);
+ }
+
+ // send back response if the communication is synchronous
+ if (consumer.getConfiguration().isSync()) {
+ sendResponse(messageEvent, exchange);
+ }
}
- private void sendResponsetoChannel(MessageEvent messageEvent, Exchange exchange) throws Exception {
- ChannelFuture future;
+ private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {
Object body;
if (ExchangeHelper.isOutCapable(exchange)) {
body = exchange.getOut().getBody();
} else {
body = exchange.getIn().getBody();
}
-
+
if (exchange.isFailed()) {
if (exchange.getException() == null) {
// fault detected
@@ -83,54 +99,43 @@ public class ServerChannelHandler extend
body = exchange.getException();
}
}
-
+
if (body == null) {
- LOG.warn("No Oubound Response received following route completion: " + exchange);
- LOG.warn("A response cannot be sent to the Client");
- messageEvent.getChannel().close();
- }
-
- if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
- future = messageEvent.getChannel().write(body, messageEvent.getRemoteAddress());
+ // must close session if no data to write otherwise client will never receive a response
+ // and wait forever (if not timing out)
+ LOG.warn("Cannot write body since its null, closing channel: " + exchange);
+ NettyHelper.close(messageEvent.getChannel());
} else {
- future = messageEvent.getChannel().write(body);
- }
-
- if (!future.isSuccess()) {
- String hostname = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getHostName();
- int port = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getPort();
- throw new CamelExchangeException("Could not send response via Channel to remote host " + hostname + " and port " + port, exchange);
- }
-
- if (LOG.isDebugEnabled()) {
- if (body instanceof byte[]) {
- body = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body);
+ // we got a body to write
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing body" + body);
+ }
+ if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
+ NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
+ } else {
+ NettyHelper.writeBody(messageEvent.getChannel(), null, body, exchange);
}
- LOG.debug("Sent Outgoing message: " + body);
- }
- }
-
- private void dispatchExchange(ChannelHandlerContext ctx, MessageEvent messageEvent, Object in) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Consumer Dispatching the Incoming exchange along the route");
}
- Exchange exchange = ((NettyEndpoint)consumer.getEndpoint()).createExchange(ctx, messageEvent);
- if (consumer.getConfiguration().isSync()) {
- exchange.setPattern(ExchangePattern.InOut);
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
}
- exchange.getIn().setBody(in);
-
- try {
- consumer.getProcessor().process(exchange);
- } catch (Exception exception) {
- throw new CamelExchangeException("Error in consumer while dispatching exchange for further processing", exchange);
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = consumer.getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
}
-
- // Send back response if the communication is synchronous
- if (consumer.getConfiguration().isSync()) {
- sendResponsetoChannel(messageEvent, exchange);
+ if (disconnect) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel when complete at address: " + messageEvent.getRemoteAddress());
+ }
+ NettyHelper.close(messageEvent.getChannel());
}
}
-
+
}
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java Thu May 6 11:13:41 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyDisconnectTest extends CamelTestSupport {
+
+ private String uri = "netty:tcp://localhost:8080?sync=true&disconnect=true";
+
+ @Test
+ public void testCloseSessionWhenComplete() throws Exception {
+ Object out = template.requestBody(uri, "Claus");
+ assertEquals("Bye Claus", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(uri).process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ exchange.getOut().setBody("Bye " + body);
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java?rev=941661&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java Thu May 6 11:13:41 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyInOutCloseChannelWhenCompleteTest extends CamelTestSupport {
+
+ @Test
+ public void testCloseSessionWhenComplete() throws Exception {
+ Object out = template.requestBody("netty:tcp://localhost:8080?sync=true", "Claus");
+ assertEquals("Bye Claus", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:8080?sync=true").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ exchange.getOut().setBody("Bye " + body);
+ exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=941661&r1=941660&r2=941661&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Thu May 6 11:13:41 2010
@@ -18,12 +18,12 @@
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
-log4j.rootLogger=DEBUG, file
+log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
log4j.logger.org.apache.camel.component.netty=DEBUG
-log4j.logger.org.apache.camel=DEBUG
-log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.commons.net=TRACE
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender