You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/05 17:17:22 UTC
svn commit: r960621 - in /camel/trunk/components:
camel-mina/src/main/java/org/apache/camel/component/mina/
camel-mina/src/test/java/org/apache/camel/component/mina/
camel-mina/src/test/resources/
camel-netty/src/main/java/org/apache/camel/component/ne...
Author: davsclaus
Date: Mon Jul 5 15:17:21 2010
New Revision: 960621
URL: http://svn.apache.org/viewvc?rev=960621&view=rev
Log:
CAMEL-2907: NettyProducer supports async routing engine. CAMEL-2908: Added textline option to Netty.
Added:
camel/trunk/components/camel-netty/src/test/data/
camel/trunk/components/camel-netty/src/test/data/message1.txt (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java (with props)
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
camel/trunk/components/camel-mina/src/test/resources/log4j.properties
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
camel/trunk/components/camel-netty/src/test/resources/log4j.properties
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java Mon Jul 5 15:17:21 2010
@@ -70,7 +70,6 @@ public class MinaConfiguration implement
return Charset.forName(encoding).name();
}
-
public String getProtocol() {
return protocol;
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Mon Jul 5 15:17:21 2010
@@ -89,7 +89,7 @@ public class MinaProducer extends Defaul
// if textline enabled then covert to a String which must be used for textline
if (endpoint.getConfiguration().isTextline()) {
- body = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, exchange, body);
+ body = endpoint.getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
}
// if sync is true then we should also wait for a response (synchronous mode)
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java Mon Jul 5 15:17:21 2010
@@ -39,7 +39,7 @@ public class MinaVmTest extends ContextT
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from(uri).to("mock:result");
+ from(uri).to("log:before?showAll=true").to("mock:result").to("log:after?showAll=true");
}
};
}
Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Mon Jul 5 15:17:21 2010
@@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, stdout
#log4j.logger.org.apache.camel.component.mina=DEBUG
#log4j.logger.org.apache.camel=DEBUG
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Mon Jul 5 15:17:21 2010
@@ -17,8 +17,11 @@
package org.apache.camel.component.netty;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
import org.apache.commons.logging.Log;
@@ -29,33 +32,25 @@ import org.jboss.netty.channel.ChannelPi
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
public class ClientPipelineFactory implements ChannelPipelineFactory {
private static final transient Log LOG = LogFactory.getLog(ClientPipelineFactory.class);
- private NettyProducer producer;
- private ChannelPipeline channelPipeline;
+ private final NettyProducer producer;
+ private final Exchange exchange;
+ private final AsyncCallback callback;
- public ClientPipelineFactory(NettyProducer producer) {
+ public ClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) {
this.producer = producer;
+ this.exchange = exchange;
+ this.callback = callback;
}
public ChannelPipeline getPipeline() throws Exception {
- if (channelPipeline != null) {
- // http://docs.jboss.org/netty/3.1/api/org/jboss/netty/handler/ssl/SslHandler.html
- // To restart the SSL session, you must remove the existing closed SslHandler
- // from the ChannelPipeline, insert a new SslHandler with a new SSLEngine into
- // the pipeline, and start the handshake process as described in the first section.
- if (channelPipeline.remove("ssl") != null) {
- // reinitialize and add SSL first
- if (LOG.isDebugEnabled()) {
- LOG.debug("Client SSL handler re-initialized on the ChannelPipeline");
- }
- channelPipeline.addFirst("ssl", configureClientSSLOnDemand());
- }
- return channelPipeline;
- }
-
- channelPipeline = Channels.pipeline();
+ // create a new pipeline
+ ChannelPipeline channelPipeline = Channels.pipeline();
SslHandler sslHandler = configureClientSSLOnDemand();
if (sslHandler != null) {
@@ -65,6 +60,12 @@ public class ClientPipelineFactory imple
channelPipeline.addLast("ssl", sslHandler);
}
+ // use read timeout handler to handle timeout while waiting for a remote reply (while reading from the remote host)
+ if (producer.getConfiguration().getTimeout() > 0) {
+ Timer timer = new HashedWheelTimer();
+ channelPipeline.addLast("timeout", new ReadTimeoutHandler(timer, producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS));
+ }
+
List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders();
for (int x = 0; x < decoders.size(); x++) {
channelPipeline.addLast("decoder-" + x, decoders.get(x));
@@ -75,11 +76,8 @@ public class ClientPipelineFactory imple
channelPipeline.addLast("encoder-" + x, encoders.get(x));
}
- if (producer.getConfiguration().getHandler() != null) {
- channelPipeline.addLast("handler", producer.getConfiguration().getHandler());
- } else {
- channelPipeline.addLast("handler", new ClientChannelHandler(producer));
- }
+ // our handler must be added last
+ channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));
return channelPipeline;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Mon Jul 5 15:17:21 2010
@@ -18,23 +18,29 @@ package org.apache.camel.component.netty
import java.io.File;
import java.net.URI;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.URISupport;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelDownstreamHandler;
-import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.CharsetUtil;
@SuppressWarnings("unchecked")
public class NettyConfiguration implements Cloneable {
+ private static final transient Log LOG = LogFactory.getLog(NettyConfiguration.class);
+
private String protocol;
private String host;
private int port;
@@ -45,13 +51,14 @@ public class NettyConfiguration implemen
private long timeout = 30000;
private boolean reuseAddress = true;
private boolean sync = true;
+ private boolean textline;
+ private String encoding;
private String passphrase;
private File keyStoreFile;
private File trustStoreFile;
private SslHandler sslHandler;
private List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
- private ChannelHandler handler;
private boolean ssl;
private long sendBufferSize = 65536;
private long receiveBufferSize = 65536;
@@ -99,70 +106,53 @@ public class NettyConfiguration implemen
keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null);
trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null);
+ // set custom encoders and decoders first
List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelDownstreamHandler.class, null);
addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class);
List<ChannelUpstreamHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelUpstreamHandler.class, null);
addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class);
+ // then set parameters with the help of the camel context type converters
+ EndpointHelper.setProperties(component.getCamelContext(), this, parameters);
+
+ // add default encoders and decoders
if (encoders.isEmpty() && decoders.isEmpty()) {
- encoders.add(component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder()));
- decoders.add(component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder()));
- }
+ // are we textline or object?
+ if (isTextline()) {
+ Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
+ encoders.add(new StringEncoder(charset));
+ decoders.add(new StringDecoder(charset));
- handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using textline encoders and decoders with charset: " + charset);
+ }
+ } else {
+ // object serializable is then used
+ encoders.add(new ObjectEncoder());
+ decoders.add(new ObjectDecoder());
- Map<String, Object> settings = URISupport.parseParameters(uri);
- if (settings.containsKey("keepAlive")) {
- setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive")));
- }
- if (settings.containsKey("tcpNoDelay")) {
- setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay")));
- }
- if (settings.containsKey("broadcast")) {
- setBroadcast(Boolean.valueOf((String) settings.get("broadcast")));
- }
- if (settings.containsKey("reuseAddress")) {
- setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
- }
- if (settings.containsKey("connectTimeoutMillis")) {
- setConnectTimeout(Long.valueOf((String) settings.get("connectTimeoutMillis")));
- }
- if (settings.containsKey("sync")) {
- setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
- }
- if (settings.containsKey("receiveTimeoutMillis")) {
- setTimeout(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
- }
- if (settings.containsKey("sendBufferSize")) {
- setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
- }
- if (settings.containsKey("receiveBufferSize")) {
- setReceiveBufferSize(Long.valueOf((String) settings.get("receiveBufferSize")));
- }
- if (settings.containsKey("ssl")) {
- setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl")));
- }
- if (settings.containsKey("corePoolSize")) {
- setCorePoolSize(Integer.valueOf((String) settings.get("corePoolSize")));
- }
- if (settings.containsKey("maxPoolSize")) {
- setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
- }
- if (settings.containsKey("disconnect")) {
- setDisconnect(Boolean.valueOf((String) settings.get("disconnect")));
- }
- if (settings.containsKey("lazyChannelCreation")) {
- setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation")));
- }
- if (settings.containsKey("transferExchange")) {
- setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using object encoders and decoders");
+ }
+ }
+ } else {
+ LOG.debug("Using configured encoders and/or decoders");
}
- if (settings.containsKey("disconnectOnNoReply")) {
- setDisconnectOnNoReply(Boolean.valueOf((String) settings.get("disconnectOnNoReply")));
+ }
+
+ public String getCharsetName() {
+ if (encoding == null) {
+ return null;
}
- if (settings.containsKey("noReplyLogLevel")) {
- setNoReplyLogLevel(LoggingLevel.valueOf((String) settings.get("noReplyLogLevel")));
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
}
+
+ return Charset.forName(encoding).name();
+ }
+
+ protected boolean isTcp() {
+ return protocol.equalsIgnoreCase("tcp");
}
public String getProtocol() {
@@ -237,6 +227,22 @@ public class NettyConfiguration implemen
this.sync = sync;
}
+ public boolean isTextline() {
+ return textline;
+ }
+
+ public void setTextline(boolean textline) {
+ this.textline = textline;
+ }
+
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
public SslHandler getSslHandler() {
return sslHandler;
}
@@ -281,14 +287,6 @@ public class NettyConfiguration implemen
this.decoders = decoders;
}
- public ChannelHandler getHandler() {
- return handler;
- }
-
- public void setHandler(ChannelHandler handler) {
- this.handler = handler;
- }
-
public long getTimeout() {
return timeout;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Mon Jul 5 15:17:21 2010
@@ -60,11 +60,15 @@ public class NettyConsumer extends Defau
@Override
protected void doStart() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Netty consumer binding to: " + configuration.getAddress());
+ }
+
super.doStart();
- if (configuration.getProtocol().equalsIgnoreCase("udp")) {
- initializeUDPServerSocketCommunicationLayer();
- } else {
+ if (isTcp()) {
initializeTCPServerSocketCommunicationLayer();
+ } else {
+ initializeUDPServerSocketCommunicationLayer();
}
LOG.info("Netty consumer bound to: " + configuration.getAddress());
@@ -72,8 +76,8 @@ public class NettyConsumer extends Defau
@Override
protected void doStop() throws Exception {
- if (LOG.isInfoEnabled()) {
- LOG.info("Netty consumer unbinding from: " + configuration.getAddress());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Netty consumer unbinding from: " + configuration.getAddress());
}
// close all channels
@@ -86,6 +90,12 @@ public class NettyConsumer extends Defau
}
super.doStop();
+
+ LOG.info("Netty consumer unbound from: " + configuration.getAddress());
+ }
+
+ public CamelContext getContext() {
+ return context;
}
public ChannelGroup getAllChannels() {
@@ -132,6 +142,10 @@ public class NettyConsumer extends Defau
this.connectionlessServerBootstrap = connectionlessServerBootstrap;
}
+ protected boolean isTcp() {
+ return configuration.getProtocol().equalsIgnoreCase("tcp");
+ }
+
private void initializeTCPServerSocketCommunicationLayer() throws Exception {
ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss",
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Mon Jul 5 15:17:21 2010
@@ -47,7 +47,7 @@ public final class NettyHelper {
* @throws CamelExchangeException is thrown if the body could not be written for some reasons
* (eg remote connection is closed etc.)
*/
- public static void writeBody(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
+ public static void writeBodySync(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException {
// the write operation is asynchronous. Use future to wait until the session has been written
ChannelFuture future;
if (remoteAddress != null) {
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Mon Jul 5 15:17:21 2010
@@ -17,18 +17,17 @@
package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelException;
-import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.ServicePoolAware;
-import org.apache.camel.component.netty.handlers.ClientChannelHandler;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.processor.Logger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
@@ -38,6 +37,7 @@ import org.jboss.netty.bootstrap.Connect
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -46,26 +46,19 @@ import org.jboss.netty.channel.socket.Da
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-public class NettyProducer extends DefaultProducer implements ServicePoolAware {
+public class NettyProducer extends DefaultAsyncProducer implements ServicePoolAware {
private static final transient Log LOG = LogFactory.getLog(NettyProducer.class);
- private final ChannelGroup allChannels;
+ private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
private CamelContext context;
private NettyConfiguration configuration;
- private CountDownLatch countdownLatch;
private ChannelFactory channelFactory;
private DatagramChannelFactory datagramChannelFactory;
- private Channel channel;
- private ClientBootstrap clientBootstrap;
- private ConnectionlessBootstrap connectionlessClientBootstrap;
- private ClientPipelineFactory clientPipelineFactory;
- private ChannelPipeline clientPipeline;
private Logger noReplyLogger;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
super(nettyEndpoint);
this.configuration = configuration;
this.context = this.getEndpoint().getCamelContext();
- this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
this.noReplyLogger = new Logger(LOG, configuration.getNoReplyLogLevel());
}
@@ -81,17 +74,27 @@ public class NettyProducer extends Defau
return false;
}
+ public CamelContext getContext() {
+ return context;
+ }
+
+ protected boolean isTcp() {
+ return configuration.getProtocol().equalsIgnoreCase("tcp");
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
- if (configuration.getProtocol().equalsIgnoreCase("udp")) {
- setupUDPCommunication();
- } else {
+ if (isTcp()) {
setupTCPCommunication();
+ } else {
+ setupUDPCommunication();
}
+
if (!configuration.isLazyChannelCreation()) {
- openConnection();
+ // ensure the connection can be established when we start up
+ openAndCloseConnection();
}
}
@@ -100,83 +103,111 @@ public class NettyProducer extends Defau
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping producer at address: " + configuration.getAddress());
}
- closeConnection();
+ // close all channels
+ ChannelGroupFuture future = ALL_CHANNELS.close();
+ future.awaitUninterruptibly();
+
+ // and then release other resources
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ }
super.doStop();
}
- public void process(Exchange exchange) throws Exception {
- if (channel == null && !configuration.isLazyChannelCreation()) {
- throw new IllegalStateException("Not started yet!");
- }
- if (channel == null || !channel.isConnected()) {
- openConnection();
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(true);
+ return true;
}
Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
if (body == null) {
noReplyLogger.log("No payload to send for exchange: " + exchange);
- return; // exit early since nothing to write
+ callback.done(true);
+ return true;
+ }
+ // if textline enabled then covert to a String which must be used for textline
+ if (getConfiguration().isTextline()) {
+ try {
+ body = context.getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+ } catch (NoTypeConversionAvailableException e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+
+ // set the exchange encoding property
+ if (getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, getConfiguration().getCharsetName());
}
- if (configuration.isSync()) {
- // only initialize latch if we should get a response
- countdownLatch = new CountDownLatch(1);
+ ChannelFuture channelFuture;
+ final Channel channel;
+ try {
+ channelFuture = openConnection(exchange, callback);
+ channel = openChannel(channelFuture);
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
// log what we are writing
if (LOG.isDebugEnabled()) {
- Object out = body;
- if (body instanceof byte[]) {
- // byte arrays is not readable so convert to string
- out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
- }
- LOG.debug("Writing body : " + out);
+ LOG.debug("Writing body: " + body);
}
+ // write the body asynchronously
+ ChannelFuture future = channel.write(body);
- // write the body
- NettyHelper.writeBody(channel, null, body, exchange);
-
- if (configuration.isSync()) {
- boolean success = countdownLatch.await(configuration.getTimeout(), TimeUnit.MILLISECONDS);
- if (!success) {
- throw new ExchangeTimedOutException(exchange, configuration.getTimeout());
- }
+ // add listener which handles the operation
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation complete " + channelFuture);
+ }
+ if (!channelFuture.isSuccess()) {
+ // no success the set the caused exception and signal callback and break
+ exchange.setException(channelFuture.getCause());
+ callback.done(false);
+ return;
+ }
- ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
- if (handler.getCause() != null) {
- throw new CamelExchangeException("Error occurred in ClientChannelHandler", exchange, handler.getCause());
- } else if (!handler.isMessageReceived()) {
- // no message received
- throw new CamelExchangeException("No response received from remote server: " + configuration.getAddress(), exchange);
- } else {
- // set the result on either IN or OUT on the original exchange depending on its pattern
- if (ExchangeHelper.isOutCapable(exchange)) {
- NettyPayloadHelper.setOut(exchange, handler.getMessage());
- } else {
- NettyPayloadHelper.setIn(exchange, handler.getMessage());
+ // if we do not expect any reply then signal callback to continue routing
+ if (!configuration.isSync()) {
+ try {
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
+ }
+ NettyHelper.close(channel);
+ }
+ } finally {
+ // signal callback to continue routing
+ callback.done(false);
+ }
}
}
- }
-
- // should channel be closed after complete?
- Boolean close;
- if (ExchangeHelper.isOutCapable(exchange)) {
- close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
- } else {
- close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
- }
+ });
- // should we disconnect, the header can override the configuration
- boolean disconnect = getConfiguration().isDisconnect();
- if (close != null) {
- disconnect = close;
- }
- if (disconnect) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress());
- }
- NettyHelper.close(channel);
- }
+ // continue routing asynchronously
+ return false;
}
protected void setupTCPCommunication() throws Exception {
@@ -187,13 +218,6 @@ public class NettyProducer extends Defau
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
}
- if (clientBootstrap == null) {
- clientBootstrap = new ClientBootstrap(channelFactory);
- clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
- }
}
protected void setupUDPCommunication() throws Exception {
@@ -202,8 +226,29 @@ public class NettyProducer extends Defau
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
}
- if (connectionlessClientBootstrap == null) {
- connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+ }
+
+ private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws Exception {
+ ChannelFuture answer;
+
+ // initialize client pipeline factory
+ ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(this, exchange, callback);
+ // must get the pipeline from the factory when opening a new connection
+ ChannelPipeline clientPipeline = clientPipelineFactory.getPipeline();
+
+ if (isTcp()) {
+ ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
+ clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+
+ // set the pipeline on the bootstrap
+ clientBootstrap.setPipeline(clientPipeline);
+ answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ return answer;
+ } else {
+ ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
@@ -212,52 +257,38 @@ public class NettyProducer extends Defau
connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
- }
- }
-
- private void openConnection() throws Exception {
- ChannelFuture channelFuture;
-
- // initialize client pipeline factory
- if (clientPipelineFactory == null) {
- clientPipelineFactory = new ClientPipelineFactory(this);
- }
- // must get the pipeline from the factory when opening a new connection
- clientPipeline = clientPipelineFactory.getPipeline();
-
- if (clientBootstrap != null) {
- clientBootstrap.setPipeline(clientPipeline);
- channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- } else if (connectionlessClientBootstrap != null) {
+ // set the pipeline on the bootstrap
connectionlessClientBootstrap.setPipeline(clientPipeline);
connectionlessClientBootstrap.bind(new InetSocketAddress(0));
- channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- } else {
- throw new IllegalStateException("Should either be TCP or UDP");
+ answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ return answer;
}
+ }
+ private Channel openChannel(ChannelFuture channelFuture) throws Exception {
+ // wait until we got connection
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
}
- channel = channelFuture.getChannel();
+ Channel channel = channelFuture.getChannel();
// to keep track of all channels in use
- allChannels.add(channel);
+ ALL_CHANNELS.add(channel);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating connector to address: " + configuration.getAddress());
}
+ return channel;
}
- private void closeConnection() throws Exception {
- // close all channels
- ChannelGroupFuture future = allChannels.close();
- future.awaitUninterruptibly();
-
- // and then release other resources
- if (channelFactory != null) {
- channelFactory.releaseExternalResources();
- }
+ private void openAndCloseConnection() throws Exception {
+ ChannelFuture future = openConnection(new DefaultExchange(context), new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ }
+ });
+ Channel channel = openChannel(future);
+ NettyHelper.close(channel);
}
public NettyConfiguration getConfiguration() {
@@ -268,10 +299,6 @@ public class NettyProducer extends Defau
this.configuration = configuration;
}
- public CountDownLatch getCountdownLatch() {
- return countdownLatch;
- }
-
public ChannelFactory getChannelFactory() {
return channelFactory;
}
@@ -280,23 +307,7 @@ public class NettyProducer extends Defau
this.channelFactory = channelFactory;
}
- public ClientBootstrap getClientBootstrap() {
- return clientBootstrap;
- }
-
- public void setClientBootstrap(ClientBootstrap clientBootstrap) {
- this.clientBootstrap = clientBootstrap;
- }
-
- public ClientPipelineFactory getClientPipelineFactory() {
- return clientPipelineFactory;
- }
-
- public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) {
- this.clientPipelineFactory = clientPipelineFactory;
- }
-
public ChannelGroup getAllChannels() {
- return allChannels;
+ return ALL_CHANNELS;
}
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Mon Jul 5 15:17:21 2010
@@ -17,7 +17,6 @@
package org.apache.camel.component.netty;
import java.util.List;
-
import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
@@ -49,21 +48,19 @@ public class ServerPipelineFactory imple
}
channelPipeline.addLast("ssl", sslHandler);
}
- List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
- for (int x = 0; x < decoders.size(); x++) {
- channelPipeline.addLast("decoder-" + x, decoders.get(x));
- }
-
List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders();
for (int x = 0; x < encoders.size(); x++) {
channelPipeline.addLast("encoder-" + x, encoders.get(x));
}
- if (consumer.getConfiguration().getHandler() != null) {
- channelPipeline.addLast("handler", consumer.getConfiguration().getHandler());
- } else {
- channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+
+ List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
+ for (int x = 0; x < decoders.size(); x++) {
+ channelPipeline.addLast("decoder-" + x, decoders.get(x));
}
-
+
+ // our handler must be added last
+ channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+
return channelPipeline;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Mon Jul 5 15:17:21 2010
@@ -16,37 +16,41 @@
*/
package org.apache.camel.component.netty.handlers;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.camel.CamelException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.component.netty.NettyHelper;
+import org.apache.camel.component.netty.NettyPayloadHelper;
import org.apache.camel.component.netty.NettyProducer;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.timeout.TimeoutException;
-@ChannelPipelineCoverage("all")
+/**
+ * Client handler which cannot be shared
+ */
public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class);
- private NettyProducer producer;
- private Object message;
- private Throwable cause;
+ private final NettyProducer producer;
+ private final Exchange exchange;
+ private final AsyncCallback callback;
private boolean messageReceived;
+ private boolean exceptionHandled;
- public ClientChannelHandler(NettyProducer producer) {
+ public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback callback) {
super();
this.producer = producer;
- }
-
- public void reset() {
- this.message = null;
- this.cause = null;
- this.messageReceived = false;
+ this.exchange = exchange;
+ this.callback = callback;
}
@Override
@@ -57,59 +61,109 @@ public class ClientChannelHandler extend
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
- this.message = null;
- this.messageReceived = false;
- this.cause = exceptionEvent.getCause();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exception caught at Channel: " + ctx.getChannel(), exceptionEvent.getCause());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing channel as an exception was thrown from Netty", cause);
}
- // close channel in case an exception was thrown
- NettyHelper.close(exceptionEvent.getChannel());
+ if (exceptionHandled) {
+ // ignore subsequent exceptions being thrown
+ return;
+ }
+
+ exceptionHandled = true;
+ Throwable cause = exceptionEvent.getCause();
+
+ // was it the timeout
+ if (cause instanceof TimeoutException) {
+ // timeout occurred
+ exchange.setException(new ExchangeTimedOutException(exchange, producer.getConfiguration().getTimeout()));
+
+ // signal callback
+ callback.done(false);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel as an exception was thrown from Netty", cause);
+ }
+ // set the cause on the exchange
+ exchange.setException(cause);
+
+ // close channel in case an exception was thrown
+ NettyHelper.close(exceptionEvent.getChannel());
+
+ // signal callback
+ callback.done(false);
+ }
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- if (producer.getConfiguration().isSync() && !messageReceived) {
- // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel closed: " + ctx.getChannel());
+ }
+
+ if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+ // session was closed but no message received. This could be because the remote server had an internal error
+ // and could not return a response. We should count down to stop waiting for a response
if (LOG.isDebugEnabled()) {
LOG.debug("Channel closed but no message received from address: " + producer.getConfiguration().getAddress());
}
- // session was closed but no message received. This could be because the remote server had an internal error
- // and could not return a response. We should count down to stop waiting for a response
- countDown();
+ exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+ // signal callback
+ callback.done(false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
- message = messageEvent.getMessage();
messageReceived = true;
- cause = null;
+ Object body = messageEvent.getMessage();
if (LOG.isDebugEnabled()) {
- LOG.debug("Message received: " + message);
+ LOG.debug("Message received: " + body);
+ }
+
+ // if textline enabled then covert to a String which must be used for textline
+ if (producer.getConfiguration().isTextline()) {
+ try {
+ body = producer.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+ } catch (NoTypeConversionAvailableException e) {
+ exchange.setException(e);
+ callback.done(false);
+ }
}
- // signal we have received message
- countDown();
- }
- protected void countDown() {
- if (producer.getConfiguration().isSync()) {
- producer.getCountdownLatch().countDown();
+ // set the result on either IN or OUT on the original exchange depending on its pattern
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ NettyPayloadHelper.setOut(exchange, body);
+ } else {
+ NettyPayloadHelper.setIn(exchange, body);
}
- }
- public Object getMessage() {
- return message;
- }
+ try {
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ }
- public boolean isMessageReceived() {
- return messageReceived;
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = producer.getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel when complete at address: " + producer.getConfiguration().getAddress());
+ }
+ NettyHelper.close(ctx.getChannel());
+ }
+ } finally {
+ // signal callback
+ callback.done(false);
+ }
}
- public Throwable getCause() {
- return cause;
- }
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Mon Jul 5 15:17:21 2010
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.netty.handlers;
-import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.component.netty.NettyConstants;
@@ -27,14 +26,17 @@ import org.apache.camel.processor.Logger
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-@ChannelPipelineCoverage("all")
+/**
+ * Server handler which is shared
+ */
+@ChannelHandler.Sharable
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class);
private NettyConsumer consumer;
@@ -47,40 +49,48 @@ public class ServerChannelHandler extend
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel open: " + e.getChannel());
+ }
// to keep track of open sockets
- consumer.getAllChannels().add(channelStateEvent.getChannel());
+ consumer.getAllChannels().add(e.getChannel());
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("Channel closed: " + e.getChannel());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel closed: " + e.getChannel());
+ }
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
- LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+ // only close if we are still allowed to run
+ if (consumer.isRunAllowed()) {
+ LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
- // close channel in case an exception was thrown
- NettyHelper.close(exceptionEvent.getChannel());
+ // close channel in case an exception was thrown
+ NettyHelper.close(exceptionEvent.getChannel());
+ }
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
Object in = messageEvent.getMessage();
if (LOG.isDebugEnabled()) {
- if (in instanceof byte[]) {
- // byte arrays is not readable so convert to string
- in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
- }
LOG.debug("Incoming message: " + in);
}
-
+
// create Exchange and let the consumer process it
Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
if (consumer.getConfiguration().isSync()) {
exchange.setPattern(ExchangePattern.InOut);
}
+ // set the exchange charset property for converting
+ if (consumer.getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, consumer.getConfiguration().getCharsetName());
+ }
try {
consumer.getProcessor().process(exchange);
@@ -123,14 +133,19 @@ public class ServerChannelHandler extend
NettyHelper.close(messageEvent.getChannel());
}
} else {
+ // if textline enabled then covert to a String which must be used for textline
+ if (consumer.getConfiguration().isTextline()) {
+ body = consumer.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+ }
+
// we got a body to write
if (LOG.isDebugEnabled()) {
LOG.debug("Writing body: " + body);
}
if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
- NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
+ NettyHelper.writeBodySync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);
} else {
- NettyHelper.writeBody(messageEvent.getChannel(), null, body, exchange);
+ NettyHelper.writeBodySync(messageEvent.getChannel(), null, body, exchange);
}
}
Added: camel/trunk/components/camel-netty/src/test/data/message1.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/data/message1.txt?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/data/message1.txt (added)
+++ camel/trunk/components/camel-netty/src/test/data/message1.txt Mon Jul 5 15:17:21 2010
@@ -0,0 +1 @@
+Hello World
\ No newline at end of file
Propchange: camel/trunk/components/camel-netty/src/test/data/message1.txt
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/data/message1.txt
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java Mon Jul 5 15:17:21 2010
@@ -20,7 +20,6 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
@@ -37,17 +36,14 @@ public class NettyExchangeTimeoutTest ex
public void testUsingTimeoutParameter() throws Exception {
// use a timeout value of 2 seconds (timeout is in millis) so we should actually get a response in this test
Endpoint endpoint = this.context.getEndpoint("netty:tcp://localhost:" + PORT + "?timeout=2000");
- Producer producer = endpoint.createProducer();
- producer.start();
- Exchange exchange = producer.createExchange();
- exchange.getIn().setBody("Hello World");
+
try {
- producer.process(exchange);
- fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException");
+ template.sendBody(endpoint, "Hello World");
+ fail("Should have thrown a exception");
} catch (Exception e) {
- assertTrue("Should have thrown an ExchangeTimedOutException", e instanceof ExchangeTimedOutException);
+ ExchangeTimedOutException timeout = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+ assertEquals(2000, timeout.getTimeout());
}
- producer.stop();
}
protected RouteBuilder createRouteBuilder() {
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java Mon Jul 5 15:17:21 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyFileTcpTest extends CamelTestSupport {
+
+ @Test
+ public void testMinaRoute() throws Exception {
+ MockEndpoint endpoint = getMockEndpoint("mock:results");
+ endpoint.expectedMessageCount(1);
+ endpoint.message(0).body().startsWith("Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // lets setup a server
+ from("netty:tcp://localhost:9123?sync=false&textline=true")
+ .to("mock:results");
+
+ from("file:src/test/data?noop=true&fileName=message1.txt").
+ to("netty:tcp://localhost:9123?sync=false&textline=true");
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyFileTcpTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Mon Jul 5 15:17:21 2010
@@ -62,7 +62,8 @@ public class NettyInOutWithForcedNoRespo
.when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
.otherwise().transform(constant(null));
- from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+ from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=INFO")
+ .to("log:foo")
.choice()
.when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
.otherwise().transform(constant(null));
Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java Mon Jul 5 15:17:21 2010
@@ -82,6 +82,7 @@ public class NettyTCPAsyncTest extends C
@Override
public void configure() throws Exception {
from("netty:tcp://localhost:5150?sync=false")
+ .to("log:result")
.to("mock:result");
}
};
Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java Mon Jul 5 15:17:21 2010
@@ -34,34 +34,31 @@ public class NettyTCPSyncTest extends Ca
@Test
public void testTCPStringInOutWithNettyConsumer() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Beginning Test ---> testTCPInOutWithNettyConsumer()");
- }
-
String response = producerTemplate.requestBody(
"netty:tcp://localhost:5150?sync=true",
"Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class);
assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Completed Test ---> testTCPInOutWithNettyConsumer()");
- }
+ }
+
+ @Test
+ public void testTCPStringInOutWithNettyConsumer2Times() throws Exception {
+ String response = producerTemplate.requestBody(
+ "netty:tcp://localhost:5150?sync=true",
+ "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class);
+ assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
+
+ response = producerTemplate.requestBody(
+ "netty:tcp://localhost:5150?sync=true",
+ "Hello World", String.class);
+ assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
}
@Test
public void testTCPObjectInOutWithNettyConsumer() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Beginning Test ---> testUDPInOutWithNettyConsumer()");
- }
-
Poetry poetry = new Poetry();
Poetry response = (Poetry) producerTemplate.requestBody("netty:tcp://localhost:5150?sync=true", poetry);
assertEquals("Dr. Sarojini Naidu", response.getPoet());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Completed Test ---> testUDPInOutWithNettyConsumer()");
- }
- }
+ }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java Mon Jul 5 15:17:21 2010
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTcpWithInOutUsingPlainSocketTest extends CamelTestSupport {
+
+ private static final int PORT = 6333;
+ // use parameter sync=true to force InOut pattern
+ protected String uri = "netty:tcp://localhost:" + PORT + "?textline=true&sync=true";
+
+ @Test
+ public void testSendAndReceiveOnce() throws Exception {
+ String response = sendAndReceive("World");
+
+ assertNotNull("Nothing received from Mina", response);
+ assertEquals("Hello World", response);
+ }
+
+ @Test
+ public void testSendAndReceiveTwice() throws Exception {
+ String london = sendAndReceive("London");
+ String paris = sendAndReceive("Paris");
+
+ assertNotNull("Nothing received from Mina", london);
+ assertNotNull("Nothing received from Mina", paris);
+ assertEquals("Hello London", london);
+ assertEquals("Hello Paris", paris);
+ }
+
+ @Test
+ public void testReceiveNoResponseSinceOutBodyIsNull() throws Exception {
+ String out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+ }
+
+ @Test
+ public void testReceiveNoResponseSinceOutBodyIsNullTwice() throws Exception {
+ String out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+
+ out = sendAndReceive("force-null-out-body");
+ assertNull("no data should be recieved", out);
+ }
+
+ @Test
+ public void testExchangeFailedOutShouldBeNull() throws Exception {
+ String out = sendAndReceive("force-exception");
+ assertTrue("out should not be the same as in when the exchange has failed", !"force-exception".equals(out));
+ assertEquals("should get the exception here", out, "java.lang.IllegalArgumentException: Forced exception");
+ }
+
+ private String sendAndReceive(String input) throws IOException {
+ byte buf[] = new byte[128];
+
+ Socket soc = new Socket();
+ soc.connect(new InetSocketAddress("localhost", PORT));
+
+ // Send message using plain Socket to test if this works
+ OutputStream os = null;
+ InputStream is = null;
+ try {
+ os = soc.getOutputStream();
+ os.write(input.getBytes());
+
+ is = soc.getInputStream();
+ int len = is.read(buf);
+ if (len == -1) {
+ // no data received
+ return null;
+ }
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ if (os != null) {
+ os.close();
+ }
+ soc.close();
+ }
+
+ // convert the buffer to chars
+ StringBuilder sb = new StringBuilder();
+ for (byte b : buf) {
+ char ch = (char) b;
+ if (ch == '\n' || ch == 0) {
+ // newline denotes end of text (added in the end in the processor below)
+ break;
+ } else {
+ sb.append(ch);
+ }
+ }
+
+ return sb.toString();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) {
+ String in = e.getIn().getBody(String.class);
+ if ("force-null-out-body".equals(in)) {
+ // forcing a null out body
+ e.getOut().setBody(null);
+ } else if ("force-exception".equals(in)) {
+ // clear out before throwing exception
+ e.getOut().setBody(null);
+ throw new IllegalArgumentException("Forced exception");
+ } else {
+ e.getOut().setBody("Hello " + in);
+ }
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java Mon Jul 5 15:17:21 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOnlyTest extends CamelTestSupport {
+
+ @Test
+ public void testTextlineInOnly() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World\nHow are you?");
+
+ template.sendBody("netty:tcp://localhost:5149?textline=true&sync=false", "Hello World\nHow are you?");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:5149?textline=true&sync=false")
+ // body should be a String when using textline codec
+ .validate(body().isInstanceOf(String.class))
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java Mon Jul 5 15:17:21 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOutTest extends CamelTestSupport {
+
+ @Test
+ public void testTextlineInOut() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ String reply = template.requestBody("netty:tcp://localhost:5148?textline=true&sync=true", "Hello World", String.class);
+ assertEquals("Bye World", reply);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:5148?textline=true&sync=true")
+ // body should be a String when using textline codec
+ .validate(body().isInstanceOf(String.class))
+ .to("mock:result")
+ .transform(body().regexReplaceAll("Hello", "Bye"));
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java?rev=960621&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java Mon Jul 5 15:17:21 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyUdpWithInOutUsingPlainSocketTest extends CamelTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(NettyUdpWithInOutUsingPlainSocketTest.class);
+ private static final int PORT = 4445;
+
+ @Test
+ public void testSendAndReceiveOnce() throws Exception {
+ String out = sendAndReceiveUdpMessages("World");
+ assertNotNull("should receive data", out);
+ assertEquals("Hello World", out);
+ }
+
+ private String sendAndReceiveUdpMessages(String input) throws Exception {
+ DatagramSocket socket = new DatagramSocket();
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+
+ byte[] data = input.getBytes();
+
+ DatagramPacket packet = new DatagramPacket(data, data.length, address, PORT);
+ LOG.debug("+++ Sending data +++");
+ socket.send(packet);
+
+ Thread.sleep(1000);
+
+ byte[] buf = new byte[128];
+ DatagramPacket receive = new DatagramPacket(buf, buf.length, address, PORT);
+ LOG.debug("+++ Receiving data +++");
+ socket.receive(receive);
+
+ socket.close();
+
+ return new String(receive.getData(), 0, receive.getLength());
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("netty:udp://127.0.0.1:" + PORT + "?textline=true&sync=true").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String s = exchange.getIn().getBody(String.class);
+ LOG.debug("Server got: " + s);
+ exchange.getOut().setBody("Hello " + s);
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=960621&r1=960620&r2=960621&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Mon Jul 5 15:17:21 2010
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
-log4j.logger.org.apache.camel.component.netty=DEBUG
+#log4j.logger.org.apache.camel.component.netty=TRACE
#log4j.logger.org.apache.camel=DEBUG
#log4j.logger.org.apache.commons.net=TRACE